diff --git a/CMakeLists.txt b/CMakeLists.txt index adc73d7..56a1d81 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -76,7 +76,7 @@ ENDIF ( "${CMAKE_C_COMPILER_ID}" STREQUAL "GNU" ) #CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h) IF ( HAVE_STASIS ) - ADD_LIBRARY(logstore logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp mergeStats.cpp mergeManager.cpp) + ADD_LIBRARY(logstore requestDispatch.cpp logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp mergeStats.cpp mergeManager.cpp) CREATE_EXECUTABLE(server) ENDIF ( HAVE_STASIS ) ADD_LIBRARY(logstore_client tcpclient.cpp) diff --git a/logserver.cpp b/logserver.cpp index 03de589..8da0650 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -5,9 +5,7 @@ #include "merger.h" #include "logstore.h" -#include "regionAllocator.h" - -#include "network.h" +#include "requestDispatch.h" #include #include @@ -17,19 +15,10 @@ #include #include -#include - #include #include - -#undef begin -#undef end -#undef try - - void *serverLoop(void *args); - void logserver::startserver(logtable *ltable) { sys_alive = true; @@ -344,7 +333,6 @@ void *serverLoop(void *args) struct sockaddr_in serv_addr; struct sockaddr_in cli_addr; int newsockfd; //newly created - socklen_t clilen = sizeof(cli_addr); //open a socket sockfd = socket(AF_INET, SOCK_STREAM, 0); @@ -379,6 +367,8 @@ void *serverLoop(void *args) int flag, result; while(true) { + socklen_t clilen = sizeof(cli_addr); + newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen); if (newsockfd < 0) { @@ -427,428 +417,6 @@ void *serverLoop(void *args) } } -int op_insert(pthread_data* data, datatuple * tuple) { - //insert/update/delete - data->ltable->insertTuple(tuple); - //step 4: send response - return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS); -} -int op_find(pthread_data* data, datatuple * tuple) { - //find the tuple - datatuple *dt = data->ltable->findTuple_first(-1, tuple->key(), tuple->keylen()); - - #ifdef STATS_ENABLED - - if(dt == 0) { - DEBUG("key not found:\t%s\n", datatuple::key_to_str(tuple.key()).c_str()); - } else if( dt->datalen() != 1024) { - DEBUG("data len for\t%s:\t%d\n", datatuple::key_to_str(tuple.key()).c_str(), - dt->datalen); - if(datatuple::compare(tuple->key(), tuple->keylen(), dt->key(), dt->keylen()) != 0) { - DEBUG("key not equal:\t%s\t%s\n", datatuple::key_to_str(tuple.key()).c_str(), - datatuple::key_to_str(dt->key).c_str()); - } - - } - #endif - - bool dt_needs_free; - if(dt == 0) //tuple does not exist. - { - dt = tuple; - dt->setDelete(); - dt_needs_free = false; - } else { - dt_needs_free = true; - } - DEBUG(stderr, "find result: %s\n", dt->isDelete() ? "not found" : "found"); - //send the reply code - int err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); - if(!err) { - //send the tuple - err = writetupletosocket(*(data->workitem), dt); - } - if(!err) { - writeendofiteratortosocket(*(data->workitem)); - } - //free datatuple - if(dt_needs_free) { - datatuple::freetuple(dt); - } - return err; -} -int op_scan(pthread_data *data, datatuple * tuple, datatuple * tuple2, size_t limit) { - size_t count = 0; - int err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); - - if(!err) { - logtable::iterator * itr = new logtable::iterator(data->ltable, tuple); - datatuple * t; - while(!err && (t = itr->getnext())) { - if(tuple2) { // are we at the end of range? - if(datatuple::compare_obj(t, tuple2) >= 0) { - datatuple::freetuple(t); - break; - } - } - err = writetupletosocket(*(data->workitem), t); - datatuple::freetuple(t); - count ++; - if(count == limit) { break; } // did we hit limit? - } - delete itr; - } - if(!err) { writeendofiteratortosocket(*(data->workitem)); } - return err; -} -int op_flush(pthread_data* data) { - data->ltable->flushTable(); - return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS); -} -int op_shutdown(pthread_data* data) { - // XXX - return writeoptosocket(*(data->workitem), LOGSTORE_UNIMPLEMENTED_ERROR); -} -int op_stat_space_usage(pthread_data* data) { - - - int xid = Tbegin(); - - rwlc_readlock(data->ltable->header_mut); - - /* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; - pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; - pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length; - pageid_t tree_c1_region_count, tree_c1_mergeable_region_count = 0, tree_c2_region_count; - - pageid_t * datapage_c1_regions = data->ltable->get_tree_c1()->get_datapage_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count); - - pageid_t * datapage_c1_mergeable_regions = NULL; - if(data->ltable->get_tree_c1_mergeable()) { - datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_datapage_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count); - } - pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_datapage_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count); - - pageid_t * tree_c1_regions = data->ltable->get_tree_c1()->get_internal_node_alloc()->list_regions(xid, &tree_c1_region_length, &tree_c1_region_count); - - pageid_t * tree_c1_mergeable_regions = NULL; - if(data->ltable->get_tree_c1_mergeable()) { - tree_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_internal_node_alloc()->list_regions(xid, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count); - } - - pageid_t * tree_c2_regions = data->ltable->get_tree_c2()->get_internal_node_alloc()->list_regions(xid, &tree_c2_region_length, &tree_c2_region_count); - */ - - pageid_t internal_c1_region_length, internal_c1_mergeable_region_length = 0, internal_c2_region_length; - pageid_t internal_c1_region_count, internal_c1_mergeable_region_count = 0, internal_c2_region_count; - pageid_t *internal_c1_regions, *internal_c1_mergeable_regions = NULL, *internal_c2_regions; - - pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; - pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; - pageid_t *datapage_c1_regions, *datapage_c1_mergeable_regions = NULL, *datapage_c2_regions; - - data->ltable->get_tree_c1()->list_regions(xid, - &internal_c1_region_length, &internal_c1_region_count, &internal_c1_regions, - &datapage_c1_region_length, &datapage_c1_region_count, &datapage_c1_regions); - if(data->ltable->get_tree_c1_mergeable()) { - data->ltable->get_tree_c1_mergeable()->list_regions(xid, - &internal_c1_mergeable_region_length, &internal_c1_mergeable_region_count, &internal_c1_mergeable_regions, - &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count, &datapage_c1_mergeable_regions); - - } - data->ltable->get_tree_c2()->list_regions(xid, - &internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions, - &datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions); - - - free(datapage_c1_regions); - free(datapage_c1_mergeable_regions); - free(datapage_c2_regions); - - free(internal_c1_regions); - free(internal_c1_mergeable_regions); - free(internal_c2_regions); - - - uint64_t treesize = PAGE_SIZE * - ( ( datapage_c1_region_count * datapage_c1_region_length ) - + ( datapage_c1_mergeable_region_count * datapage_c1_mergeable_region_length ) - + ( datapage_c2_region_count * datapage_c2_region_length) - + ( internal_c1_region_count * internal_c1_region_length ) - + ( internal_c1_mergeable_region_count * internal_c1_mergeable_region_length ) - + ( internal_c2_region_count * internal_c2_region_length) ); - - boundary_tag tag; - pageid_t pid = ROOT_RECORD.page; - TregionReadBoundaryTag(xid, pid, &tag); - uint64_t max_off = 0; - do { - max_off = pid + tag.size; - ; - } while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/)); - - rwlc_unlock(data->ltable->header_mut); - - Tcommit(xid); - - uint64_t filesize = max_off * PAGE_SIZE; - datatuple *tup = datatuple::create(&treesize, sizeof(treesize), &filesize, sizeof(filesize)); - - DEBUG("tree size: %lld, filesize %lld\n", treesize, filesize); - - int err = 0; - if(!err){ err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); } - if(!err){ err = writetupletosocket(*(data->workitem), tup); } - if(!err){ err = writeendofiteratortosocket(*(data->workitem)); } - - - datatuple::freetuple(tup); - - return err; -} -int op_stat_perf_report(pthread_data* data) { - -} - - -int op_stat_histogram(pthread_data* data, size_t limit) { - - if(limit < 3) { - return writeoptosocket(*(data->workitem), LOGSTORE_PROTOCOL_ERROR); - } - - int xid = Tbegin(); - RegionAllocator * ro_alloc = new RegionAllocator(); - diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, data->ltable->get_tree_c2()->get_root_rid()); - size_t count = 0; - int err = 0; - - while(it->next()) { count++; } - it->close(); - delete(it); - - uint64_t stride; - - if(count > limit) { - stride = count / (limit-1); - stride++; // this way, we truncate the last bucket instead of occasionally creating a tiny last bucket. - } else { - stride = 1; - } - - datatuple * tup = datatuple::create(&stride, sizeof(stride)); - - if(!err) { err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); } - if(!err) { err = writetupletosocket(*(data->workitem), tup); } - - datatuple::freetuple(tup); - - size_t cur_stride = 0; - size_t i = 0; - it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, data->ltable->get_tree_c2()->get_root_rid()); // TODO make this method private? - while(it->next()) { - i++; - if(i == count || !cur_stride) { // do we want to send this key? (this matches the first, last and interior keys) - byte * key; - size_t keylen= it->key(&key); - tup = datatuple::create(key, keylen); - - if(!err) { err = writetupletosocket(*(data->workitem), tup); } - - datatuple::freetuple(tup); - cur_stride = stride; - } - cur_stride--; - } - - it->close(); - delete(it); - delete(ro_alloc); - if(!err){ err = writeendofiteratortosocket(*(data->workitem)); } - Tcommit(xid); - return err; -} -int op_dbg_blockmap(pthread_data* data) { - // produce a list of stasis regions - int xid = Tbegin(); - - rwlc_readlock(data->ltable->header_mut); - - // produce a list of regions used by current tree components - /* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; - pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; - pageid_t * datapage_c1_regions = data->ltable->get_tree_c1()->get_datapage_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count); - pageid_t * datapage_c1_mergeable_regions = NULL; - if(data->ltable->get_tree_c1_mergeable()) { - datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_datapage_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count); - } - pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_datapage_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count); */ - - - /* pageid_t * tree_c1_regions = data->ltable->get_tree_c1()->get_internal_node_alloc()->list_regions(xid, &tree_c1_region_length, &tree_c1_region_count); - - pageid_t * tree_c1_mergeable_regions = NULL; - if(data->ltable->get_tree_c1_mergeable()) { - tree_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_internal_node_alloc()->list_regions(xid, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count); - } - pageid_t * tree_c2_regions = data->ltable->get_tree_c2()->get_internal_node_alloc()->list_regions(xid, &tree_c2_region_length, &tree_c2_region_count); */ - - pageid_t internal_c1_region_length, internal_c1_mergeable_region_length = 0, internal_c2_region_length; - pageid_t internal_c1_region_count, internal_c1_mergeable_region_count = 0, internal_c2_region_count; - pageid_t *internal_c1_regions, *internal_c1_mergeable_regions = NULL, *internal_c2_regions; - - pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; - pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; - pageid_t *datapage_c1_regions, *datapage_c1_mergeable_regions = NULL, *datapage_c2_regions; - - data->ltable->get_tree_c1()->list_regions(xid, - &internal_c1_region_length, &internal_c1_region_count, &internal_c1_regions, - &datapage_c1_region_length, &datapage_c1_region_count, &datapage_c1_regions); - if(data->ltable->get_tree_c1_mergeable()) { - data->ltable->get_tree_c1_mergeable()->list_regions(xid, - &internal_c1_mergeable_region_length, &internal_c1_mergeable_region_count, &internal_c1_mergeable_regions, - &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count, &datapage_c1_mergeable_regions); - - } - data->ltable->get_tree_c2()->list_regions(xid, - &internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions, - &datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions); - - rwlc_unlock(data->ltable->header_mut); - - Tcommit(xid); - - printf("C1 Datapage Regions (each is %lld pages long):\n", datapage_c1_region_length); - for(pageid_t i = 0; i < datapage_c1_region_count; i++) { - printf("%lld ", datapage_c1_regions[i]); - } - - printf("\nC1 Internal Node Regions (each is %lld pages long):\n", internal_c1_region_length); - for(pageid_t i = 0; i < internal_c1_region_count; i++) { - printf("%lld ", internal_c1_regions[i]); - } - - printf("\nC2 Datapage Regions (each is %lld pages long):\n", datapage_c2_region_length); - for(pageid_t i = 0; i < datapage_c2_region_count; i++) { - printf("%lld ", datapage_c2_regions[i]); - } - - printf("\nC2 Internal Node Regions (each is %lld pages long):\n", internal_c2_region_length); - for(pageid_t i = 0; i < internal_c2_region_count; i++) { - printf("%lld ", internal_c2_regions[i]); - } - printf("\nStasis Region Map\n"); - - boundary_tag tag; - pageid_t pid = ROOT_RECORD.page; - TregionReadBoundaryTag(xid, pid, &tag); - pageid_t max_off = 0; - bool done; - do { - max_off = pid + tag.size; - // print tag. - printf("\tPage %lld\tSize %lld\tAllocationManager %d\n", (long long)pid, (long long)tag.size, (int)tag.allocation_manager); - done = ! TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/); - } while(!done); - - printf("\n"); - - printf("Tree components are using %lld megabytes. File is using %lld megabytes.\n", - PAGE_SIZE * (internal_c1_region_length * internal_c1_region_count - + internal_c1_mergeable_region_length * internal_c1_mergeable_region_count - + internal_c2_region_length * internal_c2_region_count - + datapage_c1_region_length * datapage_c1_region_count - + datapage_c1_mergeable_region_length * datapage_c1_mergeable_region_count - + datapage_c2_region_length * datapage_c2_region_count) / (1024 * 1024), - (PAGE_SIZE * max_off) / (1024*1024)); - - free(datapage_c1_regions); - if(datapage_c1_mergeable_regions) free(datapage_c1_mergeable_regions); - free(datapage_c2_regions); - free(internal_c1_regions); - if(internal_c1_mergeable_regions) free(internal_c1_mergeable_regions); - free(internal_c2_regions); - return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS); -} - -int op_dbg_drop_database(pthread_data * data) { - logtable::iterator * itr = new logtable::iterator(data->ltable); - datatuple * del; - fprintf(stderr, "DROPPING DATABASE...\n"); - long long n = 0; - while((del = itr->getnext())) { - if(!del->isDelete()) { - del->setDelete(); - data->ltable->insertTuple(del); - n++; - if(!(n % 1000)) { - printf("X %lld %s\n", n, (char*)del->key()); fflush(stdout); - } - } else { - n++; - if(!(n % 1000)) { - printf("? %lld %s\n", n, (char*)del->key()); fflush(stdout); - } - } - datatuple::freetuple(del); - } - delete itr; - fprintf(stderr, "...DROP DATABASE COMPLETE\n"); - return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS); -} -int op_dbg_noop(pthread_data * data) { - return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS); -} - -int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, pthread_data* data) { - int err = 0; - if(opcode == OP_INSERT) - { - err = op_insert(data, tuple); - } - else if(opcode == OP_FIND) - { - err = op_find(data, tuple); - } - else if(opcode == OP_SCAN) - { - size_t limit = readcountfromsocket(*(data->workitem), &err); - if(!err) { err = op_scan(data, tuple, tuple2, limit); } - } - else if(opcode == OP_FLUSH) - { - err = op_flush(data); - } - else if(opcode == OP_SHUTDOWN) - { - err = op_shutdown(data); - } - else if(opcode == OP_STAT_SPACE_USAGE) - { - err = op_stat_space_usage(data); - } - else if(opcode == OP_STAT_PERF_REPORT) - { - err = op_stat_perf_report(data); - } - else if(opcode == OP_STAT_HISTOGRAM) - { - size_t limit = readcountfromsocket(*(data->workitem), &err); - err = op_stat_histogram(data, limit); - } - else if(opcode == OP_DBG_BLOCKMAP) - { - err = op_dbg_blockmap(data); - } - else if(opcode == OP_DBG_DROP_DATABASE) - { - err = op_dbg_drop_database(data); - } - else if(opcode == OP_DBG_NOOP) { - err = op_dbg_noop(data); - } - return err; -} void * thread_work_fn( void * args) { @@ -876,6 +444,8 @@ void * thread_work_fn( void * args) break; } + // XXX move this logserver error handling logic into requestDispatch.cpp + //step 1: read the opcode network_op_t opcode = readopfromsocket(*(item->data->workitem), LOGSTORE_CLIENT_REQUEST); if(opcode == LOGSTORE_CONN_CLOSED_ERROR) { @@ -892,7 +462,7 @@ void * thread_work_fn( void * args) if(!err) { tuple2 = readtuplefromsocket(*(item->data->workitem), &err); } //step 3: process the tuple - if(!err) { err = dispatch_request(opcode, tuple, tuple2, item->data); } + if(!err) { err = dispatch_request(opcode, tuple, tuple2, item->data->ltable, *(item->data->workitem)); } //free the tuple if(tuple) datatuple::freetuple(tuple); diff --git a/requestDispatch.cpp b/requestDispatch.cpp new file mode 100644 index 0000000..b0900f3 --- /dev/null +++ b/requestDispatch.cpp @@ -0,0 +1,431 @@ +/* + * requestDispatch.cpp + * + * Created on: Aug 11, 2010 + * Author: sears + */ +#include "requestDispatch.h" +#include "regionAllocator.h" + +static inline int op_insert(logtable * ltable, int fd, datatuple * tuple) { + //insert/update/delete + ltable->insertTuple(tuple); + //step 4: send response + return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); +} +static inline int op_find(logtable * ltable, int fd, datatuple * tuple) { + //find the tuple + datatuple *dt = ltable->findTuple_first(-1, tuple->key(), tuple->keylen()); + + #ifdef STATS_ENABLED + + if(dt == 0) { + DEBUG("key not found:\t%s\n", datatuple::key_to_str(tuple.key()).c_str()); + } else if( dt->datalen() != 1024) { + DEBUG("data len for\t%s:\t%d\n", datatuple::key_to_str(tuple.key()).c_str(), + dt->datalen); + if(datatuple::compare(tuple->key(), tuple->keylen(), dt->key(), dt->keylen()) != 0) { + DEBUG("key not equal:\t%s\t%s\n", datatuple::key_to_str(tuple.key()).c_str(), + datatuple::key_to_str(dt->key).c_str()); + } + + } + #endif + + bool dt_needs_free; + if(dt == 0) //tuple does not exist. + { + dt = tuple; + dt->setDelete(); + dt_needs_free = false; + } else { + dt_needs_free = true; + } + DEBUG(stderr, "find result: %s\n", dt->isDelete() ? "not found" : "found"); + //send the reply code + int err = writeoptosocket(fd, LOGSTORE_RESPONSE_SENDING_TUPLES); + if(!err) { + //send the tuple + err = writetupletosocket(fd, dt); + } + if(!err) { + writeendofiteratortosocket(fd); + } + //free datatuple + if(dt_needs_free) { + datatuple::freetuple(dt); + } + return err; +} +static inline int op_scan(logtable * ltable, int fd, datatuple * tuple, datatuple * tuple2, size_t limit) { + size_t count = 0; + int err = writeoptosocket(fd, LOGSTORE_RESPONSE_SENDING_TUPLES); + + if(!err) { + logtable::iterator * itr = new logtable::iterator(ltable, tuple); + datatuple * t; + while(!err && (t = itr->getnext())) { + if(tuple2) { // are we at the end of range? + if(datatuple::compare_obj(t, tuple2) >= 0) { + datatuple::freetuple(t); + break; + } + } + err = writetupletosocket(fd, t); + datatuple::freetuple(t); + count ++; + if(count == limit) { break; } // did we hit limit? + } + delete itr; + } + if(!err) { writeendofiteratortosocket(fd); } + return err; +} +static inline int op_flush(logtable * ltable, int fd) { + ltable->flushTable(); + return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); +} +static inline int op_shutdown(logtable * ltable, int fd) { + // XXX + return writeoptosocket(fd, LOGSTORE_UNIMPLEMENTED_ERROR); +} +static inline int op_stat_space_usage(logtable * ltable, int fd) { + + + int xid = Tbegin(); + + rwlc_readlock(ltable->header_mut); + + /* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; + pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; + pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length; + pageid_t tree_c1_region_count, tree_c1_mergeable_region_count = 0, tree_c2_region_count; + + pageid_t * datapage_c1_regions = ltable->get_tree_c1()->get_datapage_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count); + + pageid_t * datapage_c1_mergeable_regions = NULL; + if(ltable->get_tree_c1_mergeable()) { + datapage_c1_mergeable_regions = ltable->get_tree_c1_mergeable()->get_datapage_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count); + } + pageid_t * datapage_c2_regions = ltable->get_tree_c2()->get_datapage_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count); + + pageid_t * tree_c1_regions = ltable->get_tree_c1()->get_internal_node_alloc()->list_regions(xid, &tree_c1_region_length, &tree_c1_region_count); + + pageid_t * tree_c1_mergeable_regions = NULL; + if(ltable->get_tree_c1_mergeable()) { + tree_c1_mergeable_regions = ltable->get_tree_c1_mergeable()->get_internal_node_alloc()->list_regions(xid, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count); + } + + pageid_t * tree_c2_regions = ltable->get_tree_c2()->get_internal_node_alloc()->list_regions(xid, &tree_c2_region_length, &tree_c2_region_count); + */ + + pageid_t internal_c1_region_length, internal_c1_mergeable_region_length = 0, internal_c2_region_length; + pageid_t internal_c1_region_count, internal_c1_mergeable_region_count = 0, internal_c2_region_count; + pageid_t *internal_c1_regions, *internal_c1_mergeable_regions = NULL, *internal_c2_regions; + + pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; + pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; + pageid_t *datapage_c1_regions, *datapage_c1_mergeable_regions = NULL, *datapage_c2_regions; + + ltable->get_tree_c1()->list_regions(xid, + &internal_c1_region_length, &internal_c1_region_count, &internal_c1_regions, + &datapage_c1_region_length, &datapage_c1_region_count, &datapage_c1_regions); + if(ltable->get_tree_c1_mergeable()) { + ltable->get_tree_c1_mergeable()->list_regions(xid, + &internal_c1_mergeable_region_length, &internal_c1_mergeable_region_count, &internal_c1_mergeable_regions, + &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count, &datapage_c1_mergeable_regions); + + } + ltable->get_tree_c2()->list_regions(xid, + &internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions, + &datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions); + + + free(datapage_c1_regions); + free(datapage_c1_mergeable_regions); + free(datapage_c2_regions); + + free(internal_c1_regions); + free(internal_c1_mergeable_regions); + free(internal_c2_regions); + + + uint64_t treesize = PAGE_SIZE * + ( ( datapage_c1_region_count * datapage_c1_region_length ) + + ( datapage_c1_mergeable_region_count * datapage_c1_mergeable_region_length ) + + ( datapage_c2_region_count * datapage_c2_region_length) + + ( internal_c1_region_count * internal_c1_region_length ) + + ( internal_c1_mergeable_region_count * internal_c1_mergeable_region_length ) + + ( internal_c2_region_count * internal_c2_region_length) ); + + boundary_tag tag; + pageid_t pid = ROOT_RECORD.page; + TregionReadBoundaryTag(xid, pid, &tag); + uint64_t max_off = 0; + do { + max_off = pid + tag.size; + ; + } while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/)); + + rwlc_unlock(ltable->header_mut); + + Tcommit(xid); + + uint64_t filesize = max_off * PAGE_SIZE; + datatuple *tup = datatuple::create(&treesize, sizeof(treesize), &filesize, sizeof(filesize)); + + DEBUG("tree size: %lld, filesize %lld\n", treesize, filesize); + + int err = 0; + if(!err){ err = writeoptosocket(fd, LOGSTORE_RESPONSE_SENDING_TUPLES); } + if(!err){ err = writetupletosocket(fd, tup); } + if(!err){ err = writeendofiteratortosocket(fd); } + + + datatuple::freetuple(tup); + + return err; +} +static inline int op_stat_perf_report(logtable * ltable, int fd) { + +} + + +static inline int op_stat_histogram(logtable * ltable, int fd, size_t limit) { + + if(limit < 3) { + return writeoptosocket(fd, LOGSTORE_PROTOCOL_ERROR); + } + + int xid = Tbegin(); + RegionAllocator * ro_alloc = new RegionAllocator(); + diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, ltable->get_tree_c2()->get_root_rid()); + size_t count = 0; + int err = 0; + + while(it->next()) { count++; } + it->close(); + delete(it); + + uint64_t stride; + + if(count > limit) { + stride = count / (limit-1); + stride++; // this way, we truncate the last bucket instead of occasionally creating a tiny last bucket. + } else { + stride = 1; + } + + datatuple * tup = datatuple::create(&stride, sizeof(stride)); + + if(!err) { err = writeoptosocket(fd, LOGSTORE_RESPONSE_SENDING_TUPLES); } + if(!err) { err = writetupletosocket(fd, tup); } + + datatuple::freetuple(tup); + + size_t cur_stride = 0; + size_t i = 0; + it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, ltable->get_tree_c2()->get_root_rid()); // TODO make this method private? + while(it->next()) { + i++; + if(i == count || !cur_stride) { // do we want to send this key? (this matches the first, last and interior keys) + byte * key; + size_t keylen= it->key(&key); + tup = datatuple::create(key, keylen); + + if(!err) { err = writetupletosocket(fd, tup); } + + datatuple::freetuple(tup); + cur_stride = stride; + } + cur_stride--; + } + + it->close(); + delete(it); + delete(ro_alloc); + if(!err){ err = writeendofiteratortosocket(fd); } + Tcommit(xid); + return err; +} +static inline int op_dbg_blockmap(logtable * ltable, int fd) { + // produce a list of stasis regions + int xid = Tbegin(); + + rwlc_readlock(ltable->header_mut); + + // produce a list of regions used by current tree components + /* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; + pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; + pageid_t * datapage_c1_regions = ltable->get_tree_c1()->get_datapage_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count); + pageid_t * datapage_c1_mergeable_regions = NULL; + if(ltable->get_tree_c1_mergeable()) { + datapage_c1_mergeable_regions = ltable->get_tree_c1_mergeable()->get_datapage_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count); + } + pageid_t * datapage_c2_regions = ltable->get_tree_c2()->get_datapage_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count); */ + + + /* pageid_t * tree_c1_regions = ltable->get_tree_c1()->get_internal_node_alloc()->list_regions(xid, &tree_c1_region_length, &tree_c1_region_count); + + pageid_t * tree_c1_mergeable_regions = NULL; + if(ltable->get_tree_c1_mergeable()) { + tree_c1_mergeable_regions = ltable->get_tree_c1_mergeable()->get_internal_node_alloc()->list_regions(xid, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count); + } + pageid_t * tree_c2_regions = ltable->get_tree_c2()->get_internal_node_alloc()->list_regions(xid, &tree_c2_region_length, &tree_c2_region_count); */ + + pageid_t internal_c1_region_length, internal_c1_mergeable_region_length = 0, internal_c2_region_length; + pageid_t internal_c1_region_count, internal_c1_mergeable_region_count = 0, internal_c2_region_count; + pageid_t *internal_c1_regions, *internal_c1_mergeable_regions = NULL, *internal_c2_regions; + + pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; + pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; + pageid_t *datapage_c1_regions, *datapage_c1_mergeable_regions = NULL, *datapage_c2_regions; + + ltable->get_tree_c1()->list_regions(xid, + &internal_c1_region_length, &internal_c1_region_count, &internal_c1_regions, + &datapage_c1_region_length, &datapage_c1_region_count, &datapage_c1_regions); + if(ltable->get_tree_c1_mergeable()) { + ltable->get_tree_c1_mergeable()->list_regions(xid, + &internal_c1_mergeable_region_length, &internal_c1_mergeable_region_count, &internal_c1_mergeable_regions, + &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count, &datapage_c1_mergeable_regions); + + } + ltable->get_tree_c2()->list_regions(xid, + &internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions, + &datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions); + + rwlc_unlock(ltable->header_mut); + + Tcommit(xid); + + printf("C1 Datapage Regions (each is %lld pages long):\n", datapage_c1_region_length); + for(pageid_t i = 0; i < datapage_c1_region_count; i++) { + printf("%lld ", datapage_c1_regions[i]); + } + + printf("\nC1 Internal Node Regions (each is %lld pages long):\n", internal_c1_region_length); + for(pageid_t i = 0; i < internal_c1_region_count; i++) { + printf("%lld ", internal_c1_regions[i]); + } + + printf("\nC2 Datapage Regions (each is %lld pages long):\n", datapage_c2_region_length); + for(pageid_t i = 0; i < datapage_c2_region_count; i++) { + printf("%lld ", datapage_c2_regions[i]); + } + + printf("\nC2 Internal Node Regions (each is %lld pages long):\n", internal_c2_region_length); + for(pageid_t i = 0; i < internal_c2_region_count; i++) { + printf("%lld ", internal_c2_regions[i]); + } + printf("\nStasis Region Map\n"); + + boundary_tag tag; + pageid_t pid = ROOT_RECORD.page; + TregionReadBoundaryTag(xid, pid, &tag); + pageid_t max_off = 0; + bool done; + do { + max_off = pid + tag.size; + // print tag. + printf("\tPage %lld\tSize %lld\tAllocationManager %d\n", (long long)pid, (long long)tag.size, (int)tag.allocation_manager); + done = ! TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/); + } while(!done); + + printf("\n"); + + printf("Tree components are using %lld megabytes. File is using %lld megabytes.\n", + PAGE_SIZE * (internal_c1_region_length * internal_c1_region_count + + internal_c1_mergeable_region_length * internal_c1_mergeable_region_count + + internal_c2_region_length * internal_c2_region_count + + datapage_c1_region_length * datapage_c1_region_count + + datapage_c1_mergeable_region_length * datapage_c1_mergeable_region_count + + datapage_c2_region_length * datapage_c2_region_count) / (1024 * 1024), + (PAGE_SIZE * max_off) / (1024*1024)); + + free(datapage_c1_regions); + if(datapage_c1_mergeable_regions) free(datapage_c1_mergeable_regions); + free(datapage_c2_regions); + free(internal_c1_regions); + if(internal_c1_mergeable_regions) free(internal_c1_mergeable_regions); + free(internal_c2_regions); + return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); +} + +static inline int op_dbg_drop_database(logtable * ltable, int fd) { + logtable::iterator * itr = new logtable::iterator(ltable); + datatuple * del; + fprintf(stderr, "DROPPING DATABASE...\n"); + long long n = 0; + while((del = itr->getnext())) { + if(!del->isDelete()) { + del->setDelete(); + ltable->insertTuple(del); + n++; + if(!(n % 1000)) { + printf("X %lld %s\n", n, (char*)del->key()); fflush(stdout); + } + } else { + n++; + if(!(n % 1000)) { + printf("? %lld %s\n", n, (char*)del->key()); fflush(stdout); + } + } + datatuple::freetuple(del); + } + delete itr; + fprintf(stderr, "...DROP DATABASE COMPLETE\n"); + return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); +} +static inline int op_dbg_noop(logtable * ltable, int fd) { + return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); +} + +int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable * ltable, int fd) { + int err = 0; + if(opcode == OP_INSERT) + { + err = op_insert(ltable, fd, tuple); + } + else if(opcode == OP_FIND) + { + err = op_find(ltable, fd, tuple); + } + else if(opcode == OP_SCAN) + { + size_t limit = readcountfromsocket(fd, &err); + if(!err) { err = op_scan(ltable, fd, tuple, tuple2, limit); } + } + else if(opcode == OP_FLUSH) + { + err = op_flush(ltable, fd); + } + else if(opcode == OP_SHUTDOWN) + { + err = op_shutdown(ltable, fd); + } + else if(opcode == OP_STAT_SPACE_USAGE) + { + err = op_stat_space_usage(ltable, fd); + } + else if(opcode == OP_STAT_PERF_REPORT) + { + err = op_stat_perf_report(ltable, fd); + } + else if(opcode == OP_STAT_HISTOGRAM) + { + size_t limit = readcountfromsocket(fd, &err); + err = op_stat_histogram(ltable, fd, limit); + } + else if(opcode == OP_DBG_BLOCKMAP) + { + err = op_dbg_blockmap(ltable, fd); + } + else if(opcode == OP_DBG_DROP_DATABASE) + { + err = op_dbg_drop_database(ltable, fd); + } + else if(opcode == OP_DBG_NOOP) { + err = op_dbg_noop(ltable, fd); + } + return err; +} diff --git a/requestDispatch.h b/requestDispatch.h new file mode 100644 index 0000000..0f59c9f --- /dev/null +++ b/requestDispatch.h @@ -0,0 +1,16 @@ +/* + * requestDispatch.h + * + * Created on: Aug 11, 2010 + * Author: sears + */ + +#ifndef REQUESTDISPATCH_H_ +#define REQUESTDISPATCH_H_ +#include "network.h" +#include "datatuple.h" +#include "logstore.h" + +int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable * logstore, int fd); + +#endif /* REQUESTDISPATCH_H_ */