moved most of the request dispatching logic from logserver.cpp to requestDispatch.cpp

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@985 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-08-11 17:43:39 +00:00
parent 4a42e4f904
commit 569b46363b
4 changed files with 454 additions and 437 deletions

View file

@ -76,7 +76,7 @@ ENDIF ( "${CMAKE_C_COMPILER_ID}" STREQUAL "GNU" )
#CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h) #CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h)
IF ( HAVE_STASIS ) IF ( HAVE_STASIS )
ADD_LIBRARY(logstore logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp mergeStats.cpp mergeManager.cpp) ADD_LIBRARY(logstore requestDispatch.cpp logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp mergeStats.cpp mergeManager.cpp)
CREATE_EXECUTABLE(server) CREATE_EXECUTABLE(server)
ENDIF ( HAVE_STASIS ) ENDIF ( HAVE_STASIS )
ADD_LIBRARY(logstore_client tcpclient.cpp) ADD_LIBRARY(logstore_client tcpclient.cpp)

View file

@ -5,9 +5,7 @@
#include "merger.h" #include "merger.h"
#include "logstore.h" #include "logstore.h"
#include "regionAllocator.h" #include "requestDispatch.h"
#include "network.h"
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
@ -17,19 +15,10 @@
#include <sys/select.h> #include <sys/select.h>
#include <errno.h> #include <errno.h>
#include <stasis/operations/regions.h>
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#undef begin
#undef end
#undef try
void *serverLoop(void *args); void *serverLoop(void *args);
void logserver::startserver(logtable<datatuple> *ltable) void logserver::startserver(logtable<datatuple> *ltable)
{ {
sys_alive = true; sys_alive = true;
@ -344,7 +333,6 @@ void *serverLoop(void *args)
struct sockaddr_in serv_addr; struct sockaddr_in serv_addr;
struct sockaddr_in cli_addr; struct sockaddr_in cli_addr;
int newsockfd; //newly created int newsockfd; //newly created
socklen_t clilen = sizeof(cli_addr);
//open a socket //open a socket
sockfd = socket(AF_INET, SOCK_STREAM, 0); sockfd = socket(AF_INET, SOCK_STREAM, 0);
@ -379,6 +367,8 @@ void *serverLoop(void *args)
int flag, result; int flag, result;
while(true) while(true)
{ {
socklen_t clilen = sizeof(cli_addr);
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen); newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
if (newsockfd < 0) if (newsockfd < 0)
{ {
@ -427,428 +417,6 @@ void *serverLoop(void *args)
} }
} }
int op_insert(pthread_data* data, datatuple * tuple) {
//insert/update/delete
data->ltable->insertTuple(tuple);
//step 4: send response
return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS);
}
int op_find(pthread_data* data, datatuple * tuple) {
//find the tuple
datatuple *dt = data->ltable->findTuple_first(-1, tuple->key(), tuple->keylen());
#ifdef STATS_ENABLED
if(dt == 0) {
DEBUG("key not found:\t%s\n", datatuple::key_to_str(tuple.key()).c_str());
} else if( dt->datalen() != 1024) {
DEBUG("data len for\t%s:\t%d\n", datatuple::key_to_str(tuple.key()).c_str(),
dt->datalen);
if(datatuple::compare(tuple->key(), tuple->keylen(), dt->key(), dt->keylen()) != 0) {
DEBUG("key not equal:\t%s\t%s\n", datatuple::key_to_str(tuple.key()).c_str(),
datatuple::key_to_str(dt->key).c_str());
}
}
#endif
bool dt_needs_free;
if(dt == 0) //tuple does not exist.
{
dt = tuple;
dt->setDelete();
dt_needs_free = false;
} else {
dt_needs_free = true;
}
DEBUG(stderr, "find result: %s\n", dt->isDelete() ? "not found" : "found");
//send the reply code
int err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES);
if(!err) {
//send the tuple
err = writetupletosocket(*(data->workitem), dt);
}
if(!err) {
writeendofiteratortosocket(*(data->workitem));
}
//free datatuple
if(dt_needs_free) {
datatuple::freetuple(dt);
}
return err;
}
int op_scan(pthread_data *data, datatuple * tuple, datatuple * tuple2, size_t limit) {
size_t count = 0;
int err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES);
if(!err) {
logtable<datatuple>::iterator * itr = new logtable<datatuple>::iterator(data->ltable, tuple);
datatuple * t;
while(!err && (t = itr->getnext())) {
if(tuple2) { // are we at the end of range?
if(datatuple::compare_obj(t, tuple2) >= 0) {
datatuple::freetuple(t);
break;
}
}
err = writetupletosocket(*(data->workitem), t);
datatuple::freetuple(t);
count ++;
if(count == limit) { break; } // did we hit limit?
}
delete itr;
}
if(!err) { writeendofiteratortosocket(*(data->workitem)); }
return err;
}
int op_flush(pthread_data* data) {
data->ltable->flushTable();
return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS);
}
int op_shutdown(pthread_data* data) {
// XXX
return writeoptosocket(*(data->workitem), LOGSTORE_UNIMPLEMENTED_ERROR);
}
int op_stat_space_usage(pthread_data* data) {
int xid = Tbegin();
rwlc_readlock(data->ltable->header_mut);
/* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length;
pageid_t tree_c1_region_count, tree_c1_mergeable_region_count = 0, tree_c2_region_count;
pageid_t * datapage_c1_regions = data->ltable->get_tree_c1()->get_datapage_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count);
pageid_t * datapage_c1_mergeable_regions = NULL;
if(data->ltable->get_tree_c1_mergeable()) {
datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_datapage_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count);
}
pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_datapage_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count);
pageid_t * tree_c1_regions = data->ltable->get_tree_c1()->get_internal_node_alloc()->list_regions(xid, &tree_c1_region_length, &tree_c1_region_count);
pageid_t * tree_c1_mergeable_regions = NULL;
if(data->ltable->get_tree_c1_mergeable()) {
tree_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_internal_node_alloc()->list_regions(xid, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count);
}
pageid_t * tree_c2_regions = data->ltable->get_tree_c2()->get_internal_node_alloc()->list_regions(xid, &tree_c2_region_length, &tree_c2_region_count);
*/
pageid_t internal_c1_region_length, internal_c1_mergeable_region_length = 0, internal_c2_region_length;
pageid_t internal_c1_region_count, internal_c1_mergeable_region_count = 0, internal_c2_region_count;
pageid_t *internal_c1_regions, *internal_c1_mergeable_regions = NULL, *internal_c2_regions;
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t *datapage_c1_regions, *datapage_c1_mergeable_regions = NULL, *datapage_c2_regions;
data->ltable->get_tree_c1()->list_regions(xid,
&internal_c1_region_length, &internal_c1_region_count, &internal_c1_regions,
&datapage_c1_region_length, &datapage_c1_region_count, &datapage_c1_regions);
if(data->ltable->get_tree_c1_mergeable()) {
data->ltable->get_tree_c1_mergeable()->list_regions(xid,
&internal_c1_mergeable_region_length, &internal_c1_mergeable_region_count, &internal_c1_mergeable_regions,
&datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count, &datapage_c1_mergeable_regions);
}
data->ltable->get_tree_c2()->list_regions(xid,
&internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions,
&datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions);
free(datapage_c1_regions);
free(datapage_c1_mergeable_regions);
free(datapage_c2_regions);
free(internal_c1_regions);
free(internal_c1_mergeable_regions);
free(internal_c2_regions);
uint64_t treesize = PAGE_SIZE *
( ( datapage_c1_region_count * datapage_c1_region_length )
+ ( datapage_c1_mergeable_region_count * datapage_c1_mergeable_region_length )
+ ( datapage_c2_region_count * datapage_c2_region_length)
+ ( internal_c1_region_count * internal_c1_region_length )
+ ( internal_c1_mergeable_region_count * internal_c1_mergeable_region_length )
+ ( internal_c2_region_count * internal_c2_region_length) );
boundary_tag tag;
pageid_t pid = ROOT_RECORD.page;
TregionReadBoundaryTag(xid, pid, &tag);
uint64_t max_off = 0;
do {
max_off = pid + tag.size;
;
} while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/));
rwlc_unlock(data->ltable->header_mut);
Tcommit(xid);
uint64_t filesize = max_off * PAGE_SIZE;
datatuple *tup = datatuple::create(&treesize, sizeof(treesize), &filesize, sizeof(filesize));
DEBUG("tree size: %lld, filesize %lld\n", treesize, filesize);
int err = 0;
if(!err){ err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); }
if(!err){ err = writetupletosocket(*(data->workitem), tup); }
if(!err){ err = writeendofiteratortosocket(*(data->workitem)); }
datatuple::freetuple(tup);
return err;
}
int op_stat_perf_report(pthread_data* data) {
}
int op_stat_histogram(pthread_data* data, size_t limit) {
if(limit < 3) {
return writeoptosocket(*(data->workitem), LOGSTORE_PROTOCOL_ERROR);
}
int xid = Tbegin();
RegionAllocator * ro_alloc = new RegionAllocator();
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, data->ltable->get_tree_c2()->get_root_rid());
size_t count = 0;
int err = 0;
while(it->next()) { count++; }
it->close();
delete(it);
uint64_t stride;
if(count > limit) {
stride = count / (limit-1);
stride++; // this way, we truncate the last bucket instead of occasionally creating a tiny last bucket.
} else {
stride = 1;
}
datatuple * tup = datatuple::create(&stride, sizeof(stride));
if(!err) { err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); }
if(!err) { err = writetupletosocket(*(data->workitem), tup); }
datatuple::freetuple(tup);
size_t cur_stride = 0;
size_t i = 0;
it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, data->ltable->get_tree_c2()->get_root_rid()); // TODO make this method private?
while(it->next()) {
i++;
if(i == count || !cur_stride) { // do we want to send this key? (this matches the first, last and interior keys)
byte * key;
size_t keylen= it->key(&key);
tup = datatuple::create(key, keylen);
if(!err) { err = writetupletosocket(*(data->workitem), tup); }
datatuple::freetuple(tup);
cur_stride = stride;
}
cur_stride--;
}
it->close();
delete(it);
delete(ro_alloc);
if(!err){ err = writeendofiteratortosocket(*(data->workitem)); }
Tcommit(xid);
return err;
}
int op_dbg_blockmap(pthread_data* data) {
// produce a list of stasis regions
int xid = Tbegin();
rwlc_readlock(data->ltable->header_mut);
// produce a list of regions used by current tree components
/* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t * datapage_c1_regions = data->ltable->get_tree_c1()->get_datapage_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count);
pageid_t * datapage_c1_mergeable_regions = NULL;
if(data->ltable->get_tree_c1_mergeable()) {
datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_datapage_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count);
}
pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_datapage_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count); */
/* pageid_t * tree_c1_regions = data->ltable->get_tree_c1()->get_internal_node_alloc()->list_regions(xid, &tree_c1_region_length, &tree_c1_region_count);
pageid_t * tree_c1_mergeable_regions = NULL;
if(data->ltable->get_tree_c1_mergeable()) {
tree_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_internal_node_alloc()->list_regions(xid, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count);
}
pageid_t * tree_c2_regions = data->ltable->get_tree_c2()->get_internal_node_alloc()->list_regions(xid, &tree_c2_region_length, &tree_c2_region_count); */
pageid_t internal_c1_region_length, internal_c1_mergeable_region_length = 0, internal_c2_region_length;
pageid_t internal_c1_region_count, internal_c1_mergeable_region_count = 0, internal_c2_region_count;
pageid_t *internal_c1_regions, *internal_c1_mergeable_regions = NULL, *internal_c2_regions;
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t *datapage_c1_regions, *datapage_c1_mergeable_regions = NULL, *datapage_c2_regions;
data->ltable->get_tree_c1()->list_regions(xid,
&internal_c1_region_length, &internal_c1_region_count, &internal_c1_regions,
&datapage_c1_region_length, &datapage_c1_region_count, &datapage_c1_regions);
if(data->ltable->get_tree_c1_mergeable()) {
data->ltable->get_tree_c1_mergeable()->list_regions(xid,
&internal_c1_mergeable_region_length, &internal_c1_mergeable_region_count, &internal_c1_mergeable_regions,
&datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count, &datapage_c1_mergeable_regions);
}
data->ltable->get_tree_c2()->list_regions(xid,
&internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions,
&datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions);
rwlc_unlock(data->ltable->header_mut);
Tcommit(xid);
printf("C1 Datapage Regions (each is %lld pages long):\n", datapage_c1_region_length);
for(pageid_t i = 0; i < datapage_c1_region_count; i++) {
printf("%lld ", datapage_c1_regions[i]);
}
printf("\nC1 Internal Node Regions (each is %lld pages long):\n", internal_c1_region_length);
for(pageid_t i = 0; i < internal_c1_region_count; i++) {
printf("%lld ", internal_c1_regions[i]);
}
printf("\nC2 Datapage Regions (each is %lld pages long):\n", datapage_c2_region_length);
for(pageid_t i = 0; i < datapage_c2_region_count; i++) {
printf("%lld ", datapage_c2_regions[i]);
}
printf("\nC2 Internal Node Regions (each is %lld pages long):\n", internal_c2_region_length);
for(pageid_t i = 0; i < internal_c2_region_count; i++) {
printf("%lld ", internal_c2_regions[i]);
}
printf("\nStasis Region Map\n");
boundary_tag tag;
pageid_t pid = ROOT_RECORD.page;
TregionReadBoundaryTag(xid, pid, &tag);
pageid_t max_off = 0;
bool done;
do {
max_off = pid + tag.size;
// print tag.
printf("\tPage %lld\tSize %lld\tAllocationManager %d\n", (long long)pid, (long long)tag.size, (int)tag.allocation_manager);
done = ! TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/);
} while(!done);
printf("\n");
printf("Tree components are using %lld megabytes. File is using %lld megabytes.\n",
PAGE_SIZE * (internal_c1_region_length * internal_c1_region_count
+ internal_c1_mergeable_region_length * internal_c1_mergeable_region_count
+ internal_c2_region_length * internal_c2_region_count
+ datapage_c1_region_length * datapage_c1_region_count
+ datapage_c1_mergeable_region_length * datapage_c1_mergeable_region_count
+ datapage_c2_region_length * datapage_c2_region_count) / (1024 * 1024),
(PAGE_SIZE * max_off) / (1024*1024));
free(datapage_c1_regions);
if(datapage_c1_mergeable_regions) free(datapage_c1_mergeable_regions);
free(datapage_c2_regions);
free(internal_c1_regions);
if(internal_c1_mergeable_regions) free(internal_c1_mergeable_regions);
free(internal_c2_regions);
return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS);
}
int op_dbg_drop_database(pthread_data * data) {
logtable<datatuple>::iterator * itr = new logtable<datatuple>::iterator(data->ltable);
datatuple * del;
fprintf(stderr, "DROPPING DATABASE...\n");
long long n = 0;
while((del = itr->getnext())) {
if(!del->isDelete()) {
del->setDelete();
data->ltable->insertTuple(del);
n++;
if(!(n % 1000)) {
printf("X %lld %s\n", n, (char*)del->key()); fflush(stdout);
}
} else {
n++;
if(!(n % 1000)) {
printf("? %lld %s\n", n, (char*)del->key()); fflush(stdout);
}
}
datatuple::freetuple(del);
}
delete itr;
fprintf(stderr, "...DROP DATABASE COMPLETE\n");
return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS);
}
int op_dbg_noop(pthread_data * data) {
return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS);
}
int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, pthread_data* data) {
int err = 0;
if(opcode == OP_INSERT)
{
err = op_insert(data, tuple);
}
else if(opcode == OP_FIND)
{
err = op_find(data, tuple);
}
else if(opcode == OP_SCAN)
{
size_t limit = readcountfromsocket(*(data->workitem), &err);
if(!err) { err = op_scan(data, tuple, tuple2, limit); }
}
else if(opcode == OP_FLUSH)
{
err = op_flush(data);
}
else if(opcode == OP_SHUTDOWN)
{
err = op_shutdown(data);
}
else if(opcode == OP_STAT_SPACE_USAGE)
{
err = op_stat_space_usage(data);
}
else if(opcode == OP_STAT_PERF_REPORT)
{
err = op_stat_perf_report(data);
}
else if(opcode == OP_STAT_HISTOGRAM)
{
size_t limit = readcountfromsocket(*(data->workitem), &err);
err = op_stat_histogram(data, limit);
}
else if(opcode == OP_DBG_BLOCKMAP)
{
err = op_dbg_blockmap(data);
}
else if(opcode == OP_DBG_DROP_DATABASE)
{
err = op_dbg_drop_database(data);
}
else if(opcode == OP_DBG_NOOP) {
err = op_dbg_noop(data);
}
return err;
}
void * thread_work_fn( void * args) void * thread_work_fn( void * args)
{ {
@ -876,6 +444,8 @@ void * thread_work_fn( void * args)
break; break;
} }
// XXX move this logserver error handling logic into requestDispatch.cpp
//step 1: read the opcode //step 1: read the opcode
network_op_t opcode = readopfromsocket(*(item->data->workitem), LOGSTORE_CLIENT_REQUEST); network_op_t opcode = readopfromsocket(*(item->data->workitem), LOGSTORE_CLIENT_REQUEST);
if(opcode == LOGSTORE_CONN_CLOSED_ERROR) { if(opcode == LOGSTORE_CONN_CLOSED_ERROR) {
@ -892,7 +462,7 @@ void * thread_work_fn( void * args)
if(!err) { tuple2 = readtuplefromsocket(*(item->data->workitem), &err); } if(!err) { tuple2 = readtuplefromsocket(*(item->data->workitem), &err); }
//step 3: process the tuple //step 3: process the tuple
if(!err) { err = dispatch_request(opcode, tuple, tuple2, item->data); } if(!err) { err = dispatch_request(opcode, tuple, tuple2, item->data->ltable, *(item->data->workitem)); }
//free the tuple //free the tuple
if(tuple) datatuple::freetuple(tuple); if(tuple) datatuple::freetuple(tuple);

431
requestDispatch.cpp Normal file
View file

@ -0,0 +1,431 @@
/*
* requestDispatch.cpp
*
* Created on: Aug 11, 2010
* Author: sears
*/
#include "requestDispatch.h"
#include "regionAllocator.h"
static inline int op_insert(logtable<datatuple> * ltable, int fd, datatuple * tuple) {
//insert/update/delete
ltable->insertTuple(tuple);
//step 4: send response
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
}
static inline int op_find(logtable<datatuple> * ltable, int fd, datatuple * tuple) {
//find the tuple
datatuple *dt = ltable->findTuple_first(-1, tuple->key(), tuple->keylen());
#ifdef STATS_ENABLED
if(dt == 0) {
DEBUG("key not found:\t%s\n", datatuple::key_to_str(tuple.key()).c_str());
} else if( dt->datalen() != 1024) {
DEBUG("data len for\t%s:\t%d\n", datatuple::key_to_str(tuple.key()).c_str(),
dt->datalen);
if(datatuple::compare(tuple->key(), tuple->keylen(), dt->key(), dt->keylen()) != 0) {
DEBUG("key not equal:\t%s\t%s\n", datatuple::key_to_str(tuple.key()).c_str(),
datatuple::key_to_str(dt->key).c_str());
}
}
#endif
bool dt_needs_free;
if(dt == 0) //tuple does not exist.
{
dt = tuple;
dt->setDelete();
dt_needs_free = false;
} else {
dt_needs_free = true;
}
DEBUG(stderr, "find result: %s\n", dt->isDelete() ? "not found" : "found");
//send the reply code
int err = writeoptosocket(fd, LOGSTORE_RESPONSE_SENDING_TUPLES);
if(!err) {
//send the tuple
err = writetupletosocket(fd, dt);
}
if(!err) {
writeendofiteratortosocket(fd);
}
//free datatuple
if(dt_needs_free) {
datatuple::freetuple(dt);
}
return err;
}
static inline int op_scan(logtable<datatuple> * ltable, int fd, datatuple * tuple, datatuple * tuple2, size_t limit) {
size_t count = 0;
int err = writeoptosocket(fd, LOGSTORE_RESPONSE_SENDING_TUPLES);
if(!err) {
logtable<datatuple>::iterator * itr = new logtable<datatuple>::iterator(ltable, tuple);
datatuple * t;
while(!err && (t = itr->getnext())) {
if(tuple2) { // are we at the end of range?
if(datatuple::compare_obj(t, tuple2) >= 0) {
datatuple::freetuple(t);
break;
}
}
err = writetupletosocket(fd, t);
datatuple::freetuple(t);
count ++;
if(count == limit) { break; } // did we hit limit?
}
delete itr;
}
if(!err) { writeendofiteratortosocket(fd); }
return err;
}
static inline int op_flush(logtable<datatuple> * ltable, int fd) {
ltable->flushTable();
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
}
static inline int op_shutdown(logtable<datatuple> * ltable, int fd) {
// XXX
return writeoptosocket(fd, LOGSTORE_UNIMPLEMENTED_ERROR);
}
static inline int op_stat_space_usage(logtable<datatuple> * ltable, int fd) {
int xid = Tbegin();
rwlc_readlock(ltable->header_mut);
/* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length;
pageid_t tree_c1_region_count, tree_c1_mergeable_region_count = 0, tree_c2_region_count;
pageid_t * datapage_c1_regions = ltable->get_tree_c1()->get_datapage_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count);
pageid_t * datapage_c1_mergeable_regions = NULL;
if(ltable->get_tree_c1_mergeable()) {
datapage_c1_mergeable_regions = ltable->get_tree_c1_mergeable()->get_datapage_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count);
}
pageid_t * datapage_c2_regions = ltable->get_tree_c2()->get_datapage_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count);
pageid_t * tree_c1_regions = ltable->get_tree_c1()->get_internal_node_alloc()->list_regions(xid, &tree_c1_region_length, &tree_c1_region_count);
pageid_t * tree_c1_mergeable_regions = NULL;
if(ltable->get_tree_c1_mergeable()) {
tree_c1_mergeable_regions = ltable->get_tree_c1_mergeable()->get_internal_node_alloc()->list_regions(xid, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count);
}
pageid_t * tree_c2_regions = ltable->get_tree_c2()->get_internal_node_alloc()->list_regions(xid, &tree_c2_region_length, &tree_c2_region_count);
*/
pageid_t internal_c1_region_length, internal_c1_mergeable_region_length = 0, internal_c2_region_length;
pageid_t internal_c1_region_count, internal_c1_mergeable_region_count = 0, internal_c2_region_count;
pageid_t *internal_c1_regions, *internal_c1_mergeable_regions = NULL, *internal_c2_regions;
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t *datapage_c1_regions, *datapage_c1_mergeable_regions = NULL, *datapage_c2_regions;
ltable->get_tree_c1()->list_regions(xid,
&internal_c1_region_length, &internal_c1_region_count, &internal_c1_regions,
&datapage_c1_region_length, &datapage_c1_region_count, &datapage_c1_regions);
if(ltable->get_tree_c1_mergeable()) {
ltable->get_tree_c1_mergeable()->list_regions(xid,
&internal_c1_mergeable_region_length, &internal_c1_mergeable_region_count, &internal_c1_mergeable_regions,
&datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count, &datapage_c1_mergeable_regions);
}
ltable->get_tree_c2()->list_regions(xid,
&internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions,
&datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions);
free(datapage_c1_regions);
free(datapage_c1_mergeable_regions);
free(datapage_c2_regions);
free(internal_c1_regions);
free(internal_c1_mergeable_regions);
free(internal_c2_regions);
uint64_t treesize = PAGE_SIZE *
( ( datapage_c1_region_count * datapage_c1_region_length )
+ ( datapage_c1_mergeable_region_count * datapage_c1_mergeable_region_length )
+ ( datapage_c2_region_count * datapage_c2_region_length)
+ ( internal_c1_region_count * internal_c1_region_length )
+ ( internal_c1_mergeable_region_count * internal_c1_mergeable_region_length )
+ ( internal_c2_region_count * internal_c2_region_length) );
boundary_tag tag;
pageid_t pid = ROOT_RECORD.page;
TregionReadBoundaryTag(xid, pid, &tag);
uint64_t max_off = 0;
do {
max_off = pid + tag.size;
;
} while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/));
rwlc_unlock(ltable->header_mut);
Tcommit(xid);
uint64_t filesize = max_off * PAGE_SIZE;
datatuple *tup = datatuple::create(&treesize, sizeof(treesize), &filesize, sizeof(filesize));
DEBUG("tree size: %lld, filesize %lld\n", treesize, filesize);
int err = 0;
if(!err){ err = writeoptosocket(fd, LOGSTORE_RESPONSE_SENDING_TUPLES); }
if(!err){ err = writetupletosocket(fd, tup); }
if(!err){ err = writeendofiteratortosocket(fd); }
datatuple::freetuple(tup);
return err;
}
static inline int op_stat_perf_report(logtable<datatuple> * ltable, int fd) {
}
static inline int op_stat_histogram(logtable<datatuple> * ltable, int fd, size_t limit) {
if(limit < 3) {
return writeoptosocket(fd, LOGSTORE_PROTOCOL_ERROR);
}
int xid = Tbegin();
RegionAllocator * ro_alloc = new RegionAllocator();
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, ltable->get_tree_c2()->get_root_rid());
size_t count = 0;
int err = 0;
while(it->next()) { count++; }
it->close();
delete(it);
uint64_t stride;
if(count > limit) {
stride = count / (limit-1);
stride++; // this way, we truncate the last bucket instead of occasionally creating a tiny last bucket.
} else {
stride = 1;
}
datatuple * tup = datatuple::create(&stride, sizeof(stride));
if(!err) { err = writeoptosocket(fd, LOGSTORE_RESPONSE_SENDING_TUPLES); }
if(!err) { err = writetupletosocket(fd, tup); }
datatuple::freetuple(tup);
size_t cur_stride = 0;
size_t i = 0;
it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, ltable->get_tree_c2()->get_root_rid()); // TODO make this method private?
while(it->next()) {
i++;
if(i == count || !cur_stride) { // do we want to send this key? (this matches the first, last and interior keys)
byte * key;
size_t keylen= it->key(&key);
tup = datatuple::create(key, keylen);
if(!err) { err = writetupletosocket(fd, tup); }
datatuple::freetuple(tup);
cur_stride = stride;
}
cur_stride--;
}
it->close();
delete(it);
delete(ro_alloc);
if(!err){ err = writeendofiteratortosocket(fd); }
Tcommit(xid);
return err;
}
static inline int op_dbg_blockmap(logtable<datatuple> * ltable, int fd) {
// produce a list of stasis regions
int xid = Tbegin();
rwlc_readlock(ltable->header_mut);
// produce a list of regions used by current tree components
/* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t * datapage_c1_regions = ltable->get_tree_c1()->get_datapage_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count);
pageid_t * datapage_c1_mergeable_regions = NULL;
if(ltable->get_tree_c1_mergeable()) {
datapage_c1_mergeable_regions = ltable->get_tree_c1_mergeable()->get_datapage_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count);
}
pageid_t * datapage_c2_regions = ltable->get_tree_c2()->get_datapage_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count); */
/* pageid_t * tree_c1_regions = ltable->get_tree_c1()->get_internal_node_alloc()->list_regions(xid, &tree_c1_region_length, &tree_c1_region_count);
pageid_t * tree_c1_mergeable_regions = NULL;
if(ltable->get_tree_c1_mergeable()) {
tree_c1_mergeable_regions = ltable->get_tree_c1_mergeable()->get_internal_node_alloc()->list_regions(xid, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count);
}
pageid_t * tree_c2_regions = ltable->get_tree_c2()->get_internal_node_alloc()->list_regions(xid, &tree_c2_region_length, &tree_c2_region_count); */
pageid_t internal_c1_region_length, internal_c1_mergeable_region_length = 0, internal_c2_region_length;
pageid_t internal_c1_region_count, internal_c1_mergeable_region_count = 0, internal_c2_region_count;
pageid_t *internal_c1_regions, *internal_c1_mergeable_regions = NULL, *internal_c2_regions;
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t *datapage_c1_regions, *datapage_c1_mergeable_regions = NULL, *datapage_c2_regions;
ltable->get_tree_c1()->list_regions(xid,
&internal_c1_region_length, &internal_c1_region_count, &internal_c1_regions,
&datapage_c1_region_length, &datapage_c1_region_count, &datapage_c1_regions);
if(ltable->get_tree_c1_mergeable()) {
ltable->get_tree_c1_mergeable()->list_regions(xid,
&internal_c1_mergeable_region_length, &internal_c1_mergeable_region_count, &internal_c1_mergeable_regions,
&datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count, &datapage_c1_mergeable_regions);
}
ltable->get_tree_c2()->list_regions(xid,
&internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions,
&datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions);
rwlc_unlock(ltable->header_mut);
Tcommit(xid);
printf("C1 Datapage Regions (each is %lld pages long):\n", datapage_c1_region_length);
for(pageid_t i = 0; i < datapage_c1_region_count; i++) {
printf("%lld ", datapage_c1_regions[i]);
}
printf("\nC1 Internal Node Regions (each is %lld pages long):\n", internal_c1_region_length);
for(pageid_t i = 0; i < internal_c1_region_count; i++) {
printf("%lld ", internal_c1_regions[i]);
}
printf("\nC2 Datapage Regions (each is %lld pages long):\n", datapage_c2_region_length);
for(pageid_t i = 0; i < datapage_c2_region_count; i++) {
printf("%lld ", datapage_c2_regions[i]);
}
printf("\nC2 Internal Node Regions (each is %lld pages long):\n", internal_c2_region_length);
for(pageid_t i = 0; i < internal_c2_region_count; i++) {
printf("%lld ", internal_c2_regions[i]);
}
printf("\nStasis Region Map\n");
boundary_tag tag;
pageid_t pid = ROOT_RECORD.page;
TregionReadBoundaryTag(xid, pid, &tag);
pageid_t max_off = 0;
bool done;
do {
max_off = pid + tag.size;
// print tag.
printf("\tPage %lld\tSize %lld\tAllocationManager %d\n", (long long)pid, (long long)tag.size, (int)tag.allocation_manager);
done = ! TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/);
} while(!done);
printf("\n");
printf("Tree components are using %lld megabytes. File is using %lld megabytes.\n",
PAGE_SIZE * (internal_c1_region_length * internal_c1_region_count
+ internal_c1_mergeable_region_length * internal_c1_mergeable_region_count
+ internal_c2_region_length * internal_c2_region_count
+ datapage_c1_region_length * datapage_c1_region_count
+ datapage_c1_mergeable_region_length * datapage_c1_mergeable_region_count
+ datapage_c2_region_length * datapage_c2_region_count) / (1024 * 1024),
(PAGE_SIZE * max_off) / (1024*1024));
free(datapage_c1_regions);
if(datapage_c1_mergeable_regions) free(datapage_c1_mergeable_regions);
free(datapage_c2_regions);
free(internal_c1_regions);
if(internal_c1_mergeable_regions) free(internal_c1_mergeable_regions);
free(internal_c2_regions);
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
}
static inline int op_dbg_drop_database(logtable<datatuple> * ltable, int fd) {
logtable<datatuple>::iterator * itr = new logtable<datatuple>::iterator(ltable);
datatuple * del;
fprintf(stderr, "DROPPING DATABASE...\n");
long long n = 0;
while((del = itr->getnext())) {
if(!del->isDelete()) {
del->setDelete();
ltable->insertTuple(del);
n++;
if(!(n % 1000)) {
printf("X %lld %s\n", n, (char*)del->key()); fflush(stdout);
}
} else {
n++;
if(!(n % 1000)) {
printf("? %lld %s\n", n, (char*)del->key()); fflush(stdout);
}
}
datatuple::freetuple(del);
}
delete itr;
fprintf(stderr, "...DROP DATABASE COMPLETE\n");
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
}
static inline int op_dbg_noop(logtable<datatuple> * ltable, int fd) {
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
}
int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable<datatuple> * ltable, int fd) {
int err = 0;
if(opcode == OP_INSERT)
{
err = op_insert(ltable, fd, tuple);
}
else if(opcode == OP_FIND)
{
err = op_find(ltable, fd, tuple);
}
else if(opcode == OP_SCAN)
{
size_t limit = readcountfromsocket(fd, &err);
if(!err) { err = op_scan(ltable, fd, tuple, tuple2, limit); }
}
else if(opcode == OP_FLUSH)
{
err = op_flush(ltable, fd);
}
else if(opcode == OP_SHUTDOWN)
{
err = op_shutdown(ltable, fd);
}
else if(opcode == OP_STAT_SPACE_USAGE)
{
err = op_stat_space_usage(ltable, fd);
}
else if(opcode == OP_STAT_PERF_REPORT)
{
err = op_stat_perf_report(ltable, fd);
}
else if(opcode == OP_STAT_HISTOGRAM)
{
size_t limit = readcountfromsocket(fd, &err);
err = op_stat_histogram(ltable, fd, limit);
}
else if(opcode == OP_DBG_BLOCKMAP)
{
err = op_dbg_blockmap(ltable, fd);
}
else if(opcode == OP_DBG_DROP_DATABASE)
{
err = op_dbg_drop_database(ltable, fd);
}
else if(opcode == OP_DBG_NOOP) {
err = op_dbg_noop(ltable, fd);
}
return err;
}

16
requestDispatch.h Normal file
View file

@ -0,0 +1,16 @@
/*
* requestDispatch.h
*
* Created on: Aug 11, 2010
* Author: sears
*/
#ifndef REQUESTDISPATCH_H_
#define REQUESTDISPATCH_H_
#include "network.h"
#include "datatuple.h"
#include "logstore.h"
int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable<datatuple> * logstore, int fd);
#endif /* REQUESTDISPATCH_H_ */