Reopening existing trees now seems to work (no thorough tests yet). Added some network opcodes for statistics and debugging (not all are implemented yet). Reduce verbosity of server and client library.
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@654 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
27720dddbe
commit
747f4df1f3
10 changed files with 211 additions and 92 deletions
|
@ -42,6 +42,10 @@ typedef void(*diskTreeComponent_page_deallocator_t)(int, void *);
|
||||||
class diskTreeComponent{
|
class diskTreeComponent{
|
||||||
public:
|
public:
|
||||||
diskTreeComponent(int xid): region_alloc(new DataPage<datatuple>::RegionAllocator(xid, 10000)) {create(xid);} // XXX shouldn't hardcode region size.
|
diskTreeComponent(int xid): region_alloc(new DataPage<datatuple>::RegionAllocator(xid, 10000)) {create(xid);} // XXX shouldn't hardcode region size.
|
||||||
|
diskTreeComponent(int xid, recordid root, recordid state, recordid dp_state)
|
||||||
|
: tree_state(state),
|
||||||
|
root_rec(root),
|
||||||
|
region_alloc(new DataPage<datatuple>::RegionAllocator(xid, dp_state)) { lastLeaf = -1; }
|
||||||
private:
|
private:
|
||||||
recordid create(int xid);
|
recordid create(int xid);
|
||||||
public:
|
public:
|
||||||
|
|
144
logserver.cpp
144
logserver.cpp
|
@ -398,7 +398,7 @@ void *serverLoop(void *args)
|
||||||
|
|
||||||
char clientip[20];
|
char clientip[20];
|
||||||
inet_ntop(AF_INET, (void*) &(cli_addr.sin_addr), clientip, 20);
|
inet_ntop(AF_INET, (void*) &(cli_addr.sin_addr), clientip, 20);
|
||||||
printf("Connection from:\t%s\n", clientip);
|
// printf("Connection from:\t%s\n", clientip);
|
||||||
|
|
||||||
pthread_mutex_lock(sdata->qlock);
|
pthread_mutex_lock(sdata->qlock);
|
||||||
|
|
||||||
|
@ -498,7 +498,97 @@ int op_scan(pthread_data *data, datatuple * tuple, datatuple * tuple2, size_t li
|
||||||
if(!err) { writeendofiteratortosocket(*(data->workitem)); }
|
if(!err) { writeendofiteratortosocket(*(data->workitem)); }
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
int op_flush(pthread_data* data) {
|
||||||
|
data->ltable->flushTable();
|
||||||
|
return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS);
|
||||||
|
}
|
||||||
|
int op_shutdown(pthread_data* data) {
|
||||||
|
// XXX
|
||||||
|
return writeoptosocket(*(data->workitem), LOGSTORE_UNIMPLEMENTED_ERROR);
|
||||||
|
}
|
||||||
|
int op_stat_space_usage(pthread_data* data) {
|
||||||
|
|
||||||
|
|
||||||
|
int xid = Tbegin();
|
||||||
|
|
||||||
|
readlock(data->ltable->header_lock, 0);
|
||||||
|
|
||||||
|
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
|
||||||
|
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
|
||||||
|
pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length;
|
||||||
|
pageid_t tree_c1_region_count, tree_c1_mergeable_region_count = 0, tree_c2_region_count;
|
||||||
|
|
||||||
|
pageid_t * datapage_c1_regions = data->ltable->get_tree_c1()->get_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count);
|
||||||
|
|
||||||
|
pageid_t * datapage_c1_mergeable_regions = NULL;
|
||||||
|
if(data->ltable->get_tree_c1_mergeable()) {
|
||||||
|
datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count);
|
||||||
|
}
|
||||||
|
pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count);
|
||||||
|
|
||||||
|
recordid tree_c1_region_header = data->ltable->get_tree_c1()->get_tree_state();
|
||||||
|
pageid_t * tree_c1_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_region_header, &tree_c1_region_length, &tree_c1_region_count);
|
||||||
|
|
||||||
|
pageid_t * tree_c1_mergeable_regions = NULL;
|
||||||
|
if(data->ltable->get_tree_c1_mergeable()) {
|
||||||
|
recordid tree_c1_mergeable_region_header = data->ltable->get_tree_c1_mergeable()->get_tree_state();
|
||||||
|
tree_c1_mergeable_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count);
|
||||||
|
}
|
||||||
|
|
||||||
|
recordid tree_c2_region_header = data->ltable->get_tree_c2()->get_tree_state();
|
||||||
|
pageid_t * tree_c2_regions = diskTreeComponent::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count);
|
||||||
|
|
||||||
|
free(datapage_c1_regions);
|
||||||
|
free(datapage_c1_mergeable_regions);
|
||||||
|
free(datapage_c2_regions);
|
||||||
|
|
||||||
|
free(tree_c1_regions);
|
||||||
|
free(tree_c1_mergeable_regions);
|
||||||
|
free(tree_c2_regions);
|
||||||
|
|
||||||
|
|
||||||
|
uint64_t treesize = PAGE_SIZE *
|
||||||
|
( ( datapage_c1_region_count * datapage_c1_region_length )
|
||||||
|
+ ( datapage_c1_mergeable_region_count * datapage_c1_mergeable_region_length )
|
||||||
|
+ ( datapage_c2_region_count * datapage_c2_region_length)
|
||||||
|
+ ( tree_c1_region_count * tree_c1_region_length )
|
||||||
|
+ ( tree_c1_mergeable_region_count * tree_c1_mergeable_region_length )
|
||||||
|
+ ( tree_c2_region_count * tree_c2_region_length) );
|
||||||
|
|
||||||
|
boundary_tag tag;
|
||||||
|
pageid_t pid = ROOT_RECORD.page;
|
||||||
|
TregionReadBoundaryTag(xid, pid, &tag);
|
||||||
|
uint64_t max_off = 0;
|
||||||
|
do {
|
||||||
|
max_off = pid + tag.size;
|
||||||
|
;
|
||||||
|
} while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/));
|
||||||
|
|
||||||
|
unlock(data->ltable->header_lock);
|
||||||
|
|
||||||
|
Tcommit(xid);
|
||||||
|
|
||||||
|
uint64_t filesize = max_off * PAGE_SIZE;
|
||||||
|
datatuple *tup = datatuple::create(&treesize, sizeof(treesize), &filesize, sizeof(filesize));
|
||||||
|
|
||||||
|
DEBUG("tree size: %lld, filesize %lld\n", treesize, filesize);
|
||||||
|
|
||||||
|
int err = 0;
|
||||||
|
if(!err){ err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); }
|
||||||
|
if(!err){ err = writetupletosocket(*(data->workitem), tup); }
|
||||||
|
if(!err){ err = writeendofiteratortosocket(*(data->workitem)); }
|
||||||
|
|
||||||
|
|
||||||
|
datatuple::freetuple(tup);
|
||||||
|
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
int op_stat_perf_report(pthread_data* data) {
|
||||||
|
|
||||||
|
}
|
||||||
|
int op_stat_histogram(pthread_data* data, size_t limit) {
|
||||||
|
|
||||||
|
}
|
||||||
int op_dbg_blockmap(pthread_data* data) {
|
int op_dbg_blockmap(pthread_data* data) {
|
||||||
// produce a list of stasis regions
|
// produce a list of stasis regions
|
||||||
int xid = Tbegin();
|
int xid = Tbegin();
|
||||||
|
@ -510,9 +600,9 @@ int op_dbg_blockmap(pthread_data* data) {
|
||||||
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
|
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
|
||||||
pageid_t * datapage_c1_regions = data->ltable->get_tree_c1()->get_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count);
|
pageid_t * datapage_c1_regions = data->ltable->get_tree_c1()->get_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count);
|
||||||
pageid_t * datapage_c1_mergeable_regions = NULL;
|
pageid_t * datapage_c1_mergeable_regions = NULL;
|
||||||
if(data->ltable->get_tree_c1_mergeable()) {
|
if(data->ltable->get_tree_c1_mergeable()) {
|
||||||
datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count);
|
datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count);
|
||||||
}
|
}
|
||||||
pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count);
|
pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count);
|
||||||
|
|
||||||
pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length;
|
pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length;
|
||||||
|
@ -523,10 +613,10 @@ if(data->ltable->get_tree_c1_mergeable()) {
|
||||||
|
|
||||||
pageid_t * tree_c1_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_region_header, &tree_c1_region_length, &tree_c1_region_count);
|
pageid_t * tree_c1_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_region_header, &tree_c1_region_length, &tree_c1_region_count);
|
||||||
pageid_t * tree_c1_mergeable_regions = NULL;
|
pageid_t * tree_c1_mergeable_regions = NULL;
|
||||||
if(data->ltable->get_tree_c1_mergeable()) {
|
if(data->ltable->get_tree_c1_mergeable()) {
|
||||||
recordid tree_c1_mergeable_region_header = data->ltable->get_tree_c1_mergeable()->get_tree_state();
|
recordid tree_c1_mergeable_region_header = data->ltable->get_tree_c1_mergeable()->get_tree_state();
|
||||||
tree_c1_mergeable_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count);
|
tree_c1_mergeable_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count);
|
||||||
}
|
}
|
||||||
pageid_t * tree_c2_regions = diskTreeComponent::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count);
|
pageid_t * tree_c2_regions = diskTreeComponent::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count);
|
||||||
unlock(data->ltable->header_lock);
|
unlock(data->ltable->header_lock);
|
||||||
|
|
||||||
|
@ -567,7 +657,7 @@ if(data->ltable->get_tree_c1_mergeable()) {
|
||||||
|
|
||||||
printf("\n");
|
printf("\n");
|
||||||
|
|
||||||
printf("Tree components are using %lld megabytes. File is using %lld megabytes.",
|
printf("Tree components are using %lld megabytes. File is using %lld megabytes.\n",
|
||||||
PAGE_SIZE * (tree_c1_region_length * tree_c1_region_count
|
PAGE_SIZE * (tree_c1_region_length * tree_c1_region_count
|
||||||
+ tree_c1_mergeable_region_length * tree_c1_mergeable_region_count
|
+ tree_c1_mergeable_region_length * tree_c1_mergeable_region_count
|
||||||
+ tree_c2_region_length * tree_c2_region_count
|
+ tree_c2_region_length * tree_c2_region_count
|
||||||
|
@ -625,11 +715,31 @@ int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2,
|
||||||
{
|
{
|
||||||
size_t limit = readcountfromsocket(*(data->workitem), &err);
|
size_t limit = readcountfromsocket(*(data->workitem), &err);
|
||||||
if(!err) { err = op_scan(data, tuple, tuple2, limit); }
|
if(!err) { err = op_scan(data, tuple, tuple2, limit); }
|
||||||
|
}
|
||||||
|
else if(opcode == OP_FLUSH)
|
||||||
|
{
|
||||||
|
err = op_flush(data);
|
||||||
|
}
|
||||||
|
else if(opcode == OP_SHUTDOWN)
|
||||||
|
{
|
||||||
|
err = op_shutdown(data);
|
||||||
|
}
|
||||||
|
else if(opcode == OP_STAT_SPACE_USAGE)
|
||||||
|
{
|
||||||
|
err = op_stat_space_usage(data);
|
||||||
|
}
|
||||||
|
else if(opcode == OP_STAT_PERF_REPORT)
|
||||||
|
{
|
||||||
|
err = op_stat_perf_report(data);
|
||||||
|
}
|
||||||
|
else if(opcode == OP_STAT_HISTOGRAM)
|
||||||
|
{
|
||||||
|
size_t limit = readcountfromsocket(*(data->workitem), &err);
|
||||||
|
err = op_stat_histogram(data, limit);
|
||||||
}
|
}
|
||||||
else if(opcode == OP_DBG_BLOCKMAP)
|
else if(opcode == OP_DBG_BLOCKMAP)
|
||||||
{
|
{
|
||||||
err = op_dbg_blockmap(data);
|
err = op_dbg_blockmap(data);
|
||||||
|
|
||||||
}
|
}
|
||||||
else if(opcode == OP_DBG_DROP_DATABASE)
|
else if(opcode == OP_DBG_DROP_DATABASE)
|
||||||
{
|
{
|
||||||
|
@ -668,7 +778,7 @@ void * thread_work_fn( void * args)
|
||||||
network_op_t opcode = readopfromsocket(*(item->data->workitem), LOGSTORE_CLIENT_REQUEST);
|
network_op_t opcode = readopfromsocket(*(item->data->workitem), LOGSTORE_CLIENT_REQUEST);
|
||||||
if(opcode == LOGSTORE_CONN_CLOSED_ERROR) {
|
if(opcode == LOGSTORE_CONN_CLOSED_ERROR) {
|
||||||
opcode = OP_DONE;
|
opcode = OP_DONE;
|
||||||
printf("Obsolescent client closed connection uncleanly\n");
|
printf("Broken client closed connection uncleanly\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
int err = opcode == OP_DONE || opiserror(opcode); //close the conn on failure
|
int err = opcode == OP_DONE || opiserror(opcode); //close the conn on failure
|
||||||
|
@ -695,14 +805,18 @@ void * thread_work_fn( void * args)
|
||||||
// Deal with old work_queue item by freeing it or putting it back in the queue.
|
// Deal with old work_queue item by freeing it or putting it back in the queue.
|
||||||
|
|
||||||
if(err) {
|
if(err) {
|
||||||
perror("could not respond to client");
|
if(opcode != OP_DONE) {
|
||||||
|
char *msg;
|
||||||
if(true) { // XXX iserror) {
|
if(-1 != asprintf(&msg, "network error. conn closed. (%d, %d) ",
|
||||||
printf("network error. conn closed. (%d, %d, %d)\n",
|
*(item->data->workitem), item->data->work_queue->size())) {
|
||||||
errno, *(item->data->workitem), item->data->work_queue->size());
|
perror(msg);
|
||||||
|
free(msg);
|
||||||
} else {
|
} else {
|
||||||
printf("client done. conn closed. (%d, %d)\n",
|
printf("error preparing string for perror!");
|
||||||
*(item->data->workitem), item->data->work_queue->size());
|
}
|
||||||
|
} else {
|
||||||
|
// printf("client done. conn closed. (%d, %d)\n",
|
||||||
|
// *(item->data->workitem), item->data->work_queue->size());
|
||||||
}
|
}
|
||||||
close(*(item->data->workitem));
|
close(*(item->data->workitem));
|
||||||
|
|
||||||
|
|
|
@ -100,13 +100,19 @@ recordid logtable::allocTable(int xid)
|
||||||
|
|
||||||
return table_rec;
|
return table_rec;
|
||||||
}
|
}
|
||||||
|
void logtable::openTable(int xid, recordid rid) {
|
||||||
|
table_rec = rid;
|
||||||
|
Tread(xid, table_rec, &tbl_header);
|
||||||
|
tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state);
|
||||||
|
tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state);
|
||||||
|
}
|
||||||
void logtable::update_persistent_header(int xid) {
|
void logtable::update_persistent_header(int xid) {
|
||||||
|
|
||||||
tbl_header.c2_root = tree_c2->get_root_rec();
|
tbl_header.c2_root = tree_c2->get_root_rec();
|
||||||
tbl_header.c2_dp_state = tree_c2->get_alloc()->header_rid();
|
tbl_header.c2_dp_state = tree_c2->get_alloc()->header_rid();
|
||||||
tbl_header.c2_state = tree_c2->get_tree_state();
|
tbl_header.c2_state = tree_c2->get_tree_state();
|
||||||
tbl_header.c1_root = tree_c1->get_root_rec();
|
tbl_header.c1_root = tree_c1->get_root_rec();
|
||||||
tbl_header.c2_dp_state = tree_c1->get_alloc()->header_rid();
|
tbl_header.c1_dp_state = tree_c1->get_alloc()->header_rid();
|
||||||
tbl_header.c1_state = tree_c1->get_tree_state();
|
tbl_header.c1_state = tree_c1->get_tree_state();
|
||||||
|
|
||||||
Tset(xid, table_rec, &tbl_header);
|
Tset(xid, table_rec, &tbl_header);
|
||||||
|
|
|
@ -61,7 +61,7 @@ public:
|
||||||
|
|
||||||
//other class functions
|
//other class functions
|
||||||
recordid allocTable(int xid);
|
recordid allocTable(int xid);
|
||||||
|
void openTable(int xid, recordid rid);
|
||||||
void flushTable();
|
void flushTable();
|
||||||
|
|
||||||
static void tearDownTree(rbtree_ptr_t t);
|
static void tearDownTree(rbtree_ptr_t t);
|
||||||
|
|
19
network.h
19
network.h
|
@ -34,15 +34,22 @@ static const network_op_t LOGSTORE_LAST_RESPONSE_CODE = 3;
|
||||||
static const network_op_t LOGSTORE_FIRST_REQUEST_CODE = 8;
|
static const network_op_t LOGSTORE_FIRST_REQUEST_CODE = 8;
|
||||||
static const network_op_t OP_INSERT = 8; // Create, Update, Delete
|
static const network_op_t OP_INSERT = 8; // Create, Update, Delete
|
||||||
static const network_op_t OP_FIND = 9; // Read
|
static const network_op_t OP_FIND = 9; // Read
|
||||||
static const network_op_t OP_SCAN = 11;
|
static const network_op_t OP_SCAN = 10;
|
||||||
static const network_op_t OP_DONE = 12; // Please close the connection.
|
static const network_op_t OP_DONE = 11; // Please close the connection.
|
||||||
|
static const network_op_t OP_FLUSH = 12;
|
||||||
|
static const network_op_t OP_SHUTDOWN = 13;
|
||||||
|
static const network_op_t OP_STAT_SPACE_USAGE = 14;
|
||||||
|
static const network_op_t OP_STAT_PERF_REPORT = 15;
|
||||||
|
static const network_op_t OP_STAT_HISTOGRAM = 16; // Return N approximately equal size partitions (including split points + cardinalities) N=1 estimates table cardinality.
|
||||||
|
|
||||||
static const network_op_t OP_DBG_DROP_DATABASE = 13;
|
|
||||||
static const network_op_t OP_DBG_BLOCKMAP = 14;
|
static const network_op_t OP_DBG_DROP_DATABASE = 17;
|
||||||
static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 14;
|
static const network_op_t OP_DBG_BLOCKMAP = 18;
|
||||||
|
static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 18;
|
||||||
|
|
||||||
//error codes
|
//error codes
|
||||||
static const network_op_t LOGSTORE_FIRST_ERROR = 28;
|
static const network_op_t LOGSTORE_FIRST_ERROR = 27;
|
||||||
|
static const network_op_t LOGSTORE_UNIMPLEMENTED_ERROR = 27;
|
||||||
static const network_op_t LOGSTORE_CONN_CLOSED_ERROR = 28; // Unexpected EOF
|
static const network_op_t LOGSTORE_CONN_CLOSED_ERROR = 28; // Unexpected EOF
|
||||||
static const network_op_t LOGSTORE_SOCKET_ERROR = 29; // The OS returned an error.
|
static const network_op_t LOGSTORE_SOCKET_ERROR = 29; // The OS returned an error.
|
||||||
static const network_op_t LOGSTORE_REMOTE_ERROR = 30; // The other side didn't like our request
|
static const network_op_t LOGSTORE_REMOTE_ERROR = 30; // The other side didn't like our request
|
||||||
|
|
13
server.cpp
13
server.cpp
|
@ -59,8 +59,17 @@ void initialize_server()
|
||||||
|
|
||||||
int pcount = 40;
|
int pcount = 40;
|
||||||
ltable.set_fixed_page_count(pcount);
|
ltable.set_fixed_page_count(pcount);
|
||||||
|
recordid table_root = ROOT_RECORD;
|
||||||
recordid table_root = ltable.allocTable(xid);
|
if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) {
|
||||||
|
printf("Creating empty logstore\n");
|
||||||
|
table_root = ltable.allocTable(xid);
|
||||||
|
assert(table_root.page == ROOT_RECORD.page &&
|
||||||
|
table_root.slot == ROOT_RECORD.slot);
|
||||||
|
} else {
|
||||||
|
printf("Opened existing logstore\n");
|
||||||
|
table_root.size = TrecordSize(xid, ROOT_RECORD);
|
||||||
|
ltable.openTable(xid, table_root);
|
||||||
|
}
|
||||||
|
|
||||||
Tcommit(xid);
|
Tcommit(xid);
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,9 @@
|
||||||
#include "tcpclient.h"
|
#include "tcpclient.h"
|
||||||
#include "datatuple.h"
|
#include "datatuple.h"
|
||||||
#include "network.h"
|
#include "network.h"
|
||||||
|
extern "C" {
|
||||||
|
#define DEBUG(...) /* */
|
||||||
|
}
|
||||||
|
|
||||||
struct logstore_handle_t {
|
struct logstore_handle_t {
|
||||||
char *host;
|
char *host;
|
||||||
|
@ -50,13 +53,13 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time
|
||||||
(char *)&ret->serveraddr.sin_addr.s_addr, ret->server->h_length);
|
(char *)&ret->serveraddr.sin_addr.s_addr, ret->server->h_length);
|
||||||
ret->serveraddr.sin_port = htons(ret->portnum);
|
ret->serveraddr.sin_port = htons(ret->portnum);
|
||||||
|
|
||||||
printf("LogStore start\n");
|
DEBUG("LogStore start\n");
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void close_conn(logstore_handle_t *l) {
|
static inline void close_conn(logstore_handle_t *l) {
|
||||||
printf("read/write err.. conn closed.\n");
|
perror("read/write err.. conn closed.\n");
|
||||||
close(l->server_socket); //close the connection
|
close(l->server_socket); //close the connection
|
||||||
l->server_socket = -1;
|
l->server_socket = -1;
|
||||||
}
|
}
|
||||||
|
@ -71,7 +74,7 @@ logstore_client_op(logstore_handle_t *l,
|
||||||
|
|
||||||
if (l->server_socket < 0)
|
if (l->server_socket < 0)
|
||||||
{
|
{
|
||||||
printf("ERROR opening socket.\n");
|
perror("ERROR opening socket.\n");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,7 +88,7 @@ logstore_client_op(logstore_handle_t *l,
|
||||||
sizeof(int)); /* length of option value */
|
sizeof(int)); /* length of option value */
|
||||||
if (result < 0)
|
if (result < 0)
|
||||||
{
|
{
|
||||||
printf("ERROR on setting socket option TCP_NODELAY.\n");
|
perror("ERROR on setting socket option TCP_NODELAY.\n");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,11 +96,11 @@ logstore_client_op(logstore_handle_t *l,
|
||||||
/* connect: create a connection with the server */
|
/* connect: create a connection with the server */
|
||||||
if (connect(l->server_socket, (sockaddr*) &(l->serveraddr), sizeof(l->serveraddr)) < 0)
|
if (connect(l->server_socket, (sockaddr*) &(l->serveraddr), sizeof(l->serveraddr)) < 0)
|
||||||
{
|
{
|
||||||
printf("ERROR connecting\n");
|
perror("ERROR connecting\n");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("sock opened %d\n", l->server_socket);
|
DEBUG("sock opened %d\n", l->server_socket);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -132,9 +135,9 @@ logstore_client_op(logstore_handle_t *l,
|
||||||
if(err) { close_conn(l); return 0; }
|
if(err) { close_conn(l); return 0; }
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
if(count > 1) { printf("return count: %lld\n", count); }
|
if(count > 1) { fprintf(stderr, "XXX return count: %lld but iterators are not handled by the logstore_client_op api\n", count); }
|
||||||
} else if(rcode == LOGSTORE_RESPONSE_SUCCESS) {
|
} else if(rcode == LOGSTORE_RESPONSE_SUCCESS) {
|
||||||
ret = tuple;
|
ret = tuple ? tuple : datatuple::create("", 1);
|
||||||
} else {
|
} else {
|
||||||
assert(rcode == LOGSTORE_RESPONSE_FAIL); // if this is an invalid response, we should have noticed above
|
assert(rcode == LOGSTORE_RESPONSE_FAIL); // if this is an invalid response, we should have noticed above
|
||||||
ret = 0;
|
ret = 0;
|
||||||
|
@ -149,7 +152,7 @@ int logstore_client_close(logstore_handle_t* l) {
|
||||||
writetosocket(l->server_socket, (char*) &OP_DONE, sizeof(uint8_t));
|
writetosocket(l->server_socket, (char*) &OP_DONE, sizeof(uint8_t));
|
||||||
|
|
||||||
close(l->server_socket);
|
close(l->server_socket);
|
||||||
printf("socket closed %d\n.", l->server_socket);
|
DEBUG("socket closed %d\n.", l->server_socket);
|
||||||
}
|
}
|
||||||
free(l->host);
|
free(l->host);
|
||||||
free(l);
|
free(l);
|
||||||
|
|
|
@ -1,2 +1,3 @@
|
||||||
CREATE_CLIENT_EXECUTABLE(dump_blockmap)
|
CREATE_CLIENT_EXECUTABLE(dump_blockmap)
|
||||||
CREATE_CLIENT_EXECUTABLE(drop_database)
|
CREATE_CLIENT_EXECUTABLE(drop_database)
|
||||||
|
CREATE_CLIENT_EXECUTABLE(space_usage)
|
||||||
|
|
|
@ -8,34 +8,22 @@
|
||||||
#include "../tcpclient.h"
|
#include "../tcpclient.h"
|
||||||
#include "../network.h"
|
#include "../network.h"
|
||||||
#include "../datatuple.h"
|
#include "../datatuple.h"
|
||||||
|
|
||||||
void usage(char * argv[]) {
|
void usage(char * argv[]) {
|
||||||
fprintf(stderr, "usage %s [host [port]]\n", argv[0]);
|
fprintf(stderr, "usage %s [host [port]]\n", argv[0]);
|
||||||
}
|
}
|
||||||
|
#include "util_main.h"
|
||||||
int main(int argc, char * argv[]) {
|
int main(int argc, char * argv[]) {
|
||||||
bool ok = true;
|
logstore_handle_t * l = util_open_conn(argc, argv);
|
||||||
int svrport = 32432;
|
|
||||||
char * svrname = "localhost";
|
|
||||||
if(argc == 3) {
|
|
||||||
svrport = atoi(argv[2]);
|
|
||||||
}
|
|
||||||
if(argc == 2 || argc == 3) {
|
|
||||||
svrname = argv[1];
|
|
||||||
}
|
|
||||||
if(!ok || argc > 3) {
|
|
||||||
usage(argv); return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
logstore_handle_t * l = logstore_client_open(svrname, svrport, 100);
|
|
||||||
|
|
||||||
if(l == NULL) { perror("Couldn't open connection"); return 2; }
|
|
||||||
|
|
||||||
datatuple * ret = logstore_client_op(l, OP_DBG_DROP_DATABASE);
|
datatuple * ret = logstore_client_op(l, OP_DBG_DROP_DATABASE);
|
||||||
if(ret == NULL) {
|
if(ret == NULL) {
|
||||||
perror("Drop database failed."); return 3;
|
perror("Drop database failed"); return 3;
|
||||||
|
} else {
|
||||||
|
datatuple::freetuple(ret);
|
||||||
}
|
}
|
||||||
logstore_client_close(l);
|
logstore_client_close(l);
|
||||||
printf("Success\n");
|
printf("Drop database succeeded\n");
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,31 +12,18 @@
|
||||||
void usage(char * argv[]) {
|
void usage(char * argv[]) {
|
||||||
fprintf(stderr, "usage %s [host [port]]\n", argv[0]);
|
fprintf(stderr, "usage %s [host [port]]\n", argv[0]);
|
||||||
}
|
}
|
||||||
|
#include "util_main.h"
|
||||||
int main(int argc, char * argv[]) {
|
int main(int argc, char * argv[]) {
|
||||||
bool ok = true;
|
int op = OP_DBG_BLOCKMAP;
|
||||||
int svrport = 32432;
|
logstore_handle_t * l = util_open_conn(argc, argv);
|
||||||
char * svrname = "localhost";
|
|
||||||
if(argc == 3) {
|
|
||||||
svrport = atoi(argv[2]);
|
|
||||||
}
|
|
||||||
if(argc == 2 || argc == 3) {
|
|
||||||
svrname = argv[1];
|
|
||||||
}
|
|
||||||
if(!ok || argc > 3) {
|
|
||||||
usage(argv); return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
logstore_handle_t * l = logstore_client_open(svrname, svrport, 100);
|
datatuple * ret = logstore_client_op(l, op);
|
||||||
|
|
||||||
if(l == NULL) { perror("Couldn't open connection"); return 2; }
|
|
||||||
|
|
||||||
datatuple * t = datatuple::create("", 1);
|
|
||||||
datatuple * ret = logstore_client_op(l, OP_DBG_BLOCKMAP, t);
|
|
||||||
if(ret == NULL) {
|
if(ret == NULL) {
|
||||||
perror("Blockmap request failed."); return 3;
|
perror("Dump blockmap failed."); return 3;
|
||||||
|
} else {
|
||||||
|
datatuple::freetuple(ret);
|
||||||
}
|
}
|
||||||
logstore_client_close(l);
|
logstore_client_close(l);
|
||||||
printf("Success\n");
|
printf("Dump blockmap succeeded\n");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue