From 747f4df1f38429481f5db7af4fc71c968b76145d Mon Sep 17 00:00:00 2001 From: sears Date: Mon, 1 Mar 2010 21:26:07 +0000 Subject: [PATCH] Reopening existing trees now seems to work (no thorough tests yet). Added some network opcodes for statistics and debugging (not all are implemented yet). Reduce verbosity of server and client library. git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@654 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- diskTreeComponent.h | 4 ++ logserver.cpp | 152 +++++++++++++++++++++++++++++++++++------ logstore.cpp | 8 ++- logstore.h | 2 +- network.h | 19 ++++-- server.cpp | 37 ++++++---- tcpclient.cpp | 23 ++++--- util/CMakeLists.txt | 1 + util/drop_database.cpp | 28 +++----- util/dump_blockmap.cpp | 29 +++----- 10 files changed, 211 insertions(+), 92 deletions(-) diff --git a/diskTreeComponent.h b/diskTreeComponent.h index 7196e30..3eb8f61 100644 --- a/diskTreeComponent.h +++ b/diskTreeComponent.h @@ -42,6 +42,10 @@ typedef void(*diskTreeComponent_page_deallocator_t)(int, void *); class diskTreeComponent{ public: diskTreeComponent(int xid): region_alloc(new DataPage::RegionAllocator(xid, 10000)) {create(xid);} // XXX shouldn't hardcode region size. + diskTreeComponent(int xid, recordid root, recordid state, recordid dp_state) + : tree_state(state), + root_rec(root), + region_alloc(new DataPage::RegionAllocator(xid, dp_state)) { lastLeaf = -1; } private: recordid create(int xid); public: diff --git a/logserver.cpp b/logserver.cpp index 1f8ea7e..2b349b1 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -398,7 +398,7 @@ void *serverLoop(void *args) char clientip[20]; inet_ntop(AF_INET, (void*) &(cli_addr.sin_addr), clientip, 20); - printf("Connection from:\t%s\n", clientip); +// printf("Connection from:\t%s\n", clientip); pthread_mutex_lock(sdata->qlock); @@ -498,7 +498,97 @@ int op_scan(pthread_data *data, datatuple * tuple, datatuple * tuple2, size_t li if(!err) { writeendofiteratortosocket(*(data->workitem)); } return err; } +int op_flush(pthread_data* data) { + data->ltable->flushTable(); + return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS); +} +int op_shutdown(pthread_data* data) { + // XXX + return writeoptosocket(*(data->workitem), LOGSTORE_UNIMPLEMENTED_ERROR); +} +int op_stat_space_usage(pthread_data* data) { + + int xid = Tbegin(); + + readlock(data->ltable->header_lock, 0); + + pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; + pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; + pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length; + pageid_t tree_c1_region_count, tree_c1_mergeable_region_count = 0, tree_c2_region_count; + + pageid_t * datapage_c1_regions = data->ltable->get_tree_c1()->get_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count); + + pageid_t * datapage_c1_mergeable_regions = NULL; + if(data->ltable->get_tree_c1_mergeable()) { + datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count); + } + pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count); + + recordid tree_c1_region_header = data->ltable->get_tree_c1()->get_tree_state(); + pageid_t * tree_c1_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_region_header, &tree_c1_region_length, &tree_c1_region_count); + + pageid_t * tree_c1_mergeable_regions = NULL; + if(data->ltable->get_tree_c1_mergeable()) { + recordid tree_c1_mergeable_region_header = data->ltable->get_tree_c1_mergeable()->get_tree_state(); + tree_c1_mergeable_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count); + } + + recordid tree_c2_region_header = data->ltable->get_tree_c2()->get_tree_state(); + pageid_t * tree_c2_regions = diskTreeComponent::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count); + + free(datapage_c1_regions); + free(datapage_c1_mergeable_regions); + free(datapage_c2_regions); + + free(tree_c1_regions); + free(tree_c1_mergeable_regions); + free(tree_c2_regions); + + + uint64_t treesize = PAGE_SIZE * + ( ( datapage_c1_region_count * datapage_c1_region_length ) + + ( datapage_c1_mergeable_region_count * datapage_c1_mergeable_region_length ) + + ( datapage_c2_region_count * datapage_c2_region_length) + + ( tree_c1_region_count * tree_c1_region_length ) + + ( tree_c1_mergeable_region_count * tree_c1_mergeable_region_length ) + + ( tree_c2_region_count * tree_c2_region_length) ); + + boundary_tag tag; + pageid_t pid = ROOT_RECORD.page; + TregionReadBoundaryTag(xid, pid, &tag); + uint64_t max_off = 0; + do { + max_off = pid + tag.size; + ; + } while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/)); + + unlock(data->ltable->header_lock); + + Tcommit(xid); + + uint64_t filesize = max_off * PAGE_SIZE; + datatuple *tup = datatuple::create(&treesize, sizeof(treesize), &filesize, sizeof(filesize)); + + DEBUG("tree size: %lld, filesize %lld\n", treesize, filesize); + + int err = 0; + if(!err){ err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); } + if(!err){ err = writetupletosocket(*(data->workitem), tup); } + if(!err){ err = writeendofiteratortosocket(*(data->workitem)); } + + + datatuple::freetuple(tup); + + return err; +} +int op_stat_perf_report(pthread_data* data) { + +} +int op_stat_histogram(pthread_data* data, size_t limit) { + +} int op_dbg_blockmap(pthread_data* data) { // produce a list of stasis regions int xid = Tbegin(); @@ -510,9 +600,9 @@ int op_dbg_blockmap(pthread_data* data) { pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; pageid_t * datapage_c1_regions = data->ltable->get_tree_c1()->get_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count); pageid_t * datapage_c1_mergeable_regions = NULL; -if(data->ltable->get_tree_c1_mergeable()) { - datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count); -} + if(data->ltable->get_tree_c1_mergeable()) { + datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count); + } pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count); pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length; @@ -523,10 +613,10 @@ if(data->ltable->get_tree_c1_mergeable()) { pageid_t * tree_c1_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_region_header, &tree_c1_region_length, &tree_c1_region_count); pageid_t * tree_c1_mergeable_regions = NULL; -if(data->ltable->get_tree_c1_mergeable()) { - recordid tree_c1_mergeable_region_header = data->ltable->get_tree_c1_mergeable()->get_tree_state(); - tree_c1_mergeable_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count); -} + if(data->ltable->get_tree_c1_mergeable()) { + recordid tree_c1_mergeable_region_header = data->ltable->get_tree_c1_mergeable()->get_tree_state(); + tree_c1_mergeable_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count); + } pageid_t * tree_c2_regions = diskTreeComponent::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count); unlock(data->ltable->header_lock); @@ -567,7 +657,7 @@ if(data->ltable->get_tree_c1_mergeable()) { printf("\n"); - printf("Tree components are using %lld megabytes. File is using %lld megabytes.", + printf("Tree components are using %lld megabytes. File is using %lld megabytes.\n", PAGE_SIZE * (tree_c1_region_length * tree_c1_region_count + tree_c1_mergeable_region_length * tree_c1_mergeable_region_count + tree_c2_region_length * tree_c2_region_count @@ -626,10 +716,30 @@ int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, size_t limit = readcountfromsocket(*(data->workitem), &err); if(!err) { err = op_scan(data, tuple, tuple2, limit); } } - else if(opcode == OP_DBG_BLOCKMAP) + else if(opcode == OP_FLUSH) + { + err = op_flush(data); + } + else if(opcode == OP_SHUTDOWN) + { + err = op_shutdown(data); + } + else if(opcode == OP_STAT_SPACE_USAGE) + { + err = op_stat_space_usage(data); + } + else if(opcode == OP_STAT_PERF_REPORT) + { + err = op_stat_perf_report(data); + } + else if(opcode == OP_STAT_HISTOGRAM) + { + size_t limit = readcountfromsocket(*(data->workitem), &err); + err = op_stat_histogram(data, limit); + } + else if(opcode == OP_DBG_BLOCKMAP) { err = op_dbg_blockmap(data); - } else if(opcode == OP_DBG_DROP_DATABASE) { @@ -668,7 +778,7 @@ void * thread_work_fn( void * args) network_op_t opcode = readopfromsocket(*(item->data->workitem), LOGSTORE_CLIENT_REQUEST); if(opcode == LOGSTORE_CONN_CLOSED_ERROR) { opcode = OP_DONE; - printf("Obsolescent client closed connection uncleanly\n"); + printf("Broken client closed connection uncleanly\n"); } int err = opcode == OP_DONE || opiserror(opcode); //close the conn on failure @@ -695,14 +805,18 @@ void * thread_work_fn( void * args) // Deal with old work_queue item by freeing it or putting it back in the queue. if(err) { - perror("could not respond to client"); - - if(true) { // XXX iserror) { - printf("network error. conn closed. (%d, %d, %d)\n", - errno, *(item->data->workitem), item->data->work_queue->size()); + if(opcode != OP_DONE) { + char *msg; + if(-1 != asprintf(&msg, "network error. conn closed. (%d, %d) ", + *(item->data->workitem), item->data->work_queue->size())) { + perror(msg); + free(msg); + } else { + printf("error preparing string for perror!"); + } } else { - printf("client done. conn closed. (%d, %d)\n", - *(item->data->workitem), item->data->work_queue->size()); +// printf("client done. conn closed. (%d, %d)\n", +// *(item->data->workitem), item->data->work_queue->size()); } close(*(item->data->workitem)); diff --git a/logstore.cpp b/logstore.cpp index e06093c..79efa75 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -100,13 +100,19 @@ recordid logtable::allocTable(int xid) return table_rec; } +void logtable::openTable(int xid, recordid rid) { + table_rec = rid; + Tread(xid, table_rec, &tbl_header); + tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state); + tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state); +} void logtable::update_persistent_header(int xid) { tbl_header.c2_root = tree_c2->get_root_rec(); tbl_header.c2_dp_state = tree_c2->get_alloc()->header_rid(); tbl_header.c2_state = tree_c2->get_tree_state(); tbl_header.c1_root = tree_c1->get_root_rec(); - tbl_header.c2_dp_state = tree_c1->get_alloc()->header_rid(); + tbl_header.c1_dp_state = tree_c1->get_alloc()->header_rid(); tbl_header.c1_state = tree_c1->get_tree_state(); Tset(xid, table_rec, &tbl_header); diff --git a/logstore.h b/logstore.h index 8caa0ae..fb07b5e 100644 --- a/logstore.h +++ b/logstore.h @@ -61,7 +61,7 @@ public: //other class functions recordid allocTable(int xid); - + void openTable(int xid, recordid rid); void flushTable(); static void tearDownTree(rbtree_ptr_t t); diff --git a/network.h b/network.h index 5cbdf2b..4da28b3 100644 --- a/network.h +++ b/network.h @@ -34,15 +34,22 @@ static const network_op_t LOGSTORE_LAST_RESPONSE_CODE = 3; static const network_op_t LOGSTORE_FIRST_REQUEST_CODE = 8; static const network_op_t OP_INSERT = 8; // Create, Update, Delete static const network_op_t OP_FIND = 9; // Read -static const network_op_t OP_SCAN = 11; -static const network_op_t OP_DONE = 12; // Please close the connection. +static const network_op_t OP_SCAN = 10; +static const network_op_t OP_DONE = 11; // Please close the connection. +static const network_op_t OP_FLUSH = 12; +static const network_op_t OP_SHUTDOWN = 13; +static const network_op_t OP_STAT_SPACE_USAGE = 14; +static const network_op_t OP_STAT_PERF_REPORT = 15; +static const network_op_t OP_STAT_HISTOGRAM = 16; // Return N approximately equal size partitions (including split points + cardinalities) N=1 estimates table cardinality. -static const network_op_t OP_DBG_DROP_DATABASE = 13; -static const network_op_t OP_DBG_BLOCKMAP = 14; -static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 14; + +static const network_op_t OP_DBG_DROP_DATABASE = 17; +static const network_op_t OP_DBG_BLOCKMAP = 18; +static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 18; //error codes -static const network_op_t LOGSTORE_FIRST_ERROR = 28; +static const network_op_t LOGSTORE_FIRST_ERROR = 27; +static const network_op_t LOGSTORE_UNIMPLEMENTED_ERROR = 27; static const network_op_t LOGSTORE_CONN_CLOSED_ERROR = 28; // Unexpected EOF static const network_op_t LOGSTORE_SOCKET_ERROR = 29; // The OS returned an error. static const network_op_t LOGSTORE_REMOTE_ERROR = 30; // The other side didn't like our request diff --git a/server.cpp b/server.cpp index 9d7f418..1c2eec4 100644 --- a/server.cpp +++ b/server.cpp @@ -27,19 +27,19 @@ merge_scheduler *mscheduler=0; }*/ void terminate (int param) { - printf ("Stopping server...\n"); - lserver->stopserver(); - delete lserver; + printf ("Stopping server...\n"); + lserver->stopserver(); + delete lserver; - printf("Stopping merge threads...\n"); - mscheduler->shutdown(); - delete mscheduler; - - printf("Deinitializing stasis...\n"); - fflush(stdout); - diskTreeComponent::deinit_stasis(); - - exit(0); + printf("Stopping merge threads...\n"); + mscheduler->shutdown(); + delete mscheduler; + + printf("Deinitializing stasis...\n"); + fflush(stdout); + diskTreeComponent::deinit_stasis(); + + exit(0); } void initialize_server() @@ -59,8 +59,17 @@ void initialize_server() int pcount = 40; ltable.set_fixed_page_count(pcount); - - recordid table_root = ltable.allocTable(xid); + recordid table_root = ROOT_RECORD; + if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { + printf("Creating empty logstore\n"); + table_root = ltable.allocTable(xid); + assert(table_root.page == ROOT_RECORD.page && + table_root.slot == ROOT_RECORD.slot); + } else { + printf("Opened existing logstore\n"); + table_root.size = TrecordSize(xid, ROOT_RECORD); + ltable.openTable(xid, table_root); + } Tcommit(xid); diff --git a/tcpclient.cpp b/tcpclient.cpp index ceed3fc..92bfc8b 100644 --- a/tcpclient.cpp +++ b/tcpclient.cpp @@ -19,6 +19,9 @@ #include "tcpclient.h" #include "datatuple.h" #include "network.h" +extern "C" { + #define DEBUG(...) /* */ +} struct logstore_handle_t { char *host; @@ -50,13 +53,13 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time (char *)&ret->serveraddr.sin_addr.s_addr, ret->server->h_length); ret->serveraddr.sin_port = htons(ret->portnum); - printf("LogStore start\n"); + DEBUG("LogStore start\n"); return ret; } static inline void close_conn(logstore_handle_t *l) { - printf("read/write err.. conn closed.\n"); + perror("read/write err.. conn closed.\n"); close(l->server_socket); //close the connection l->server_socket = -1; } @@ -71,8 +74,8 @@ logstore_client_op(logstore_handle_t *l, if (l->server_socket < 0) { - printf("ERROR opening socket.\n"); - return 0; + perror("ERROR opening socket.\n"); + return 0; } @@ -85,7 +88,7 @@ logstore_client_op(logstore_handle_t *l, sizeof(int)); /* length of option value */ if (result < 0) { - printf("ERROR on setting socket option TCP_NODELAY.\n"); + perror("ERROR on setting socket option TCP_NODELAY.\n"); return 0; } @@ -93,11 +96,11 @@ logstore_client_op(logstore_handle_t *l, /* connect: create a connection with the server */ if (connect(l->server_socket, (sockaddr*) &(l->serveraddr), sizeof(l->serveraddr)) < 0) { - printf("ERROR connecting\n"); + perror("ERROR connecting\n"); return 0; } - printf("sock opened %d\n", l->server_socket); + DEBUG("sock opened %d\n", l->server_socket); } @@ -132,9 +135,9 @@ logstore_client_op(logstore_handle_t *l, if(err) { close_conn(l); return 0; } count++; } - if(count > 1) { printf("return count: %lld\n", count); } + if(count > 1) { fprintf(stderr, "XXX return count: %lld but iterators are not handled by the logstore_client_op api\n", count); } } else if(rcode == LOGSTORE_RESPONSE_SUCCESS) { - ret = tuple; + ret = tuple ? tuple : datatuple::create("", 1); } else { assert(rcode == LOGSTORE_RESPONSE_FAIL); // if this is an invalid response, we should have noticed above ret = 0; @@ -149,7 +152,7 @@ int logstore_client_close(logstore_handle_t* l) { writetosocket(l->server_socket, (char*) &OP_DONE, sizeof(uint8_t)); close(l->server_socket); - printf("socket closed %d\n.", l->server_socket); + DEBUG("socket closed %d\n.", l->server_socket); } free(l->host); free(l); diff --git a/util/CMakeLists.txt b/util/CMakeLists.txt index e46c18b..dcf1c93 100644 --- a/util/CMakeLists.txt +++ b/util/CMakeLists.txt @@ -1,2 +1,3 @@ CREATE_CLIENT_EXECUTABLE(dump_blockmap) CREATE_CLIENT_EXECUTABLE(drop_database) +CREATE_CLIENT_EXECUTABLE(space_usage) diff --git a/util/drop_database.cpp b/util/drop_database.cpp index 990c207..bc9fadb 100644 --- a/util/drop_database.cpp +++ b/util/drop_database.cpp @@ -8,34 +8,22 @@ #include "../tcpclient.h" #include "../network.h" #include "../datatuple.h" - void usage(char * argv[]) { fprintf(stderr, "usage %s [host [port]]\n", argv[0]); } - +#include "util_main.h" int main(int argc, char * argv[]) { - bool ok = true; - int svrport = 32432; - char * svrname = "localhost"; - if(argc == 3) { - svrport = atoi(argv[2]); - } - if(argc == 2 || argc == 3) { - svrname = argv[1]; - } - if(!ok || argc > 3) { - usage(argv); return 1; - } - - logstore_handle_t * l = logstore_client_open(svrname, svrport, 100); - - if(l == NULL) { perror("Couldn't open connection"); return 2; } + logstore_handle_t * l = util_open_conn(argc, argv); datatuple * ret = logstore_client_op(l, OP_DBG_DROP_DATABASE); if(ret == NULL) { - perror("Drop database failed."); return 3; + perror("Drop database failed"); return 3; + } else { + datatuple::freetuple(ret); } logstore_client_close(l); - printf("Success\n"); + printf("Drop database succeeded\n"); return 0; + } + diff --git a/util/dump_blockmap.cpp b/util/dump_blockmap.cpp index a1a3ced..f1dc3fa 100644 --- a/util/dump_blockmap.cpp +++ b/util/dump_blockmap.cpp @@ -12,31 +12,18 @@ void usage(char * argv[]) { fprintf(stderr, "usage %s [host [port]]\n", argv[0]); } - +#include "util_main.h" int main(int argc, char * argv[]) { - bool ok = true; - int svrport = 32432; - char * svrname = "localhost"; - if(argc == 3) { - svrport = atoi(argv[2]); - } - if(argc == 2 || argc == 3) { - svrname = argv[1]; - } - if(!ok || argc > 3) { - usage(argv); return 1; - } + int op = OP_DBG_BLOCKMAP; + logstore_handle_t * l = util_open_conn(argc, argv); - logstore_handle_t * l = logstore_client_open(svrname, svrport, 100); - - if(l == NULL) { perror("Couldn't open connection"); return 2; } - - datatuple * t = datatuple::create("", 1); - datatuple * ret = logstore_client_op(l, OP_DBG_BLOCKMAP, t); + datatuple * ret = logstore_client_op(l, op); if(ret == NULL) { - perror("Blockmap request failed."); return 3; + perror("Dump blockmap failed."); return 3; + } else { + datatuple::freetuple(ret); } logstore_client_close(l); - printf("Success\n"); + printf("Dump blockmap succeeded\n"); return 0; }