From 7738b67bc05e95f81ffe8abadd7e40c2829c96e8 Mon Sep 17 00:00:00 2001 From: sears Date: Fri, 19 Feb 2010 00:34:41 +0000 Subject: [PATCH] Implemented network iterator protocol. It hasn't been tested or hooked up to anything because logstore iterators don't exist yet. git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@589 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- datatuple.h | 2 +- logserver.cpp | 22 +++++++++++++++++++--- network.h | 44 ++++++++++++++++++++++++++++++++++++-------- tcpclient.cpp | 36 +++++++----------------------------- 4 files changed, 63 insertions(+), 41 deletions(-) diff --git a/datatuple.h b/datatuple.h index 41899ff..d1449de 100644 --- a/datatuple.h +++ b/datatuple.h @@ -12,8 +12,8 @@ public: typedef uint32_t len_t ; typedef unsigned char* key_t ; typedef unsigned char* data_t ; -private: static const len_t DELETE = ((len_t)0) - 1; +private: len_t datalen_; byte* key_; byte* data_; // aliases key. data_ - 1 should be the \0 terminating key_. diff --git a/logserver.cpp b/logserver.cpp index 0931476..eb1541d 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -436,12 +436,11 @@ void * thread_work_fn( void * args) continue; } + int err; //step 2: read the tuple from client - datatuple * tuple = readtuplefromsocket(*(item->data->workitem)); + datatuple * tuple = readtuplefromsocket(*(item->data->workitem), &err); //step 3: process the tuple - - int err = 0; if(opcode == OP_INSERT) { @@ -492,6 +491,23 @@ void * thread_work_fn( void * args) datatuple::freetuple(dt); } } + else if(opcode == OP_SCAN) + { + datatuple * end_tuple; + size_t limit; + if(!err) { end_tuple = readtuplefromsocket(*(item->data->workitem), &err); } + if(!err) { limit = readcountfromsocket(*(item->data->workitem), &err); } + if(!err) { + treeIterator * itr; +// if(tuple) { +// itr = new treeIterator(item->data->ltable, *tuple); +// } else { +// itr = new treeIterator(item->data->ltable); +// } + abort(); + delete itr; + } + } else if(opcode == OP_DBG_BLOCKMAP) { // produce a list of stasis regions diff --git a/network.h b/network.h index b40d5ac..1617b75 100644 --- a/network.h +++ b/network.h @@ -24,12 +24,12 @@ static const network_op_t LOGSTORE_LAST_RESPONSE_CODE = 3; static const network_op_t LOGSTORE_FIRST_REQUEST_CODE = 8; static const network_op_t OP_INSERT = 8; // Create, Update, Delete static const network_op_t OP_FIND = 9; // Read +static const network_op_t OP_SCAN = 11; +static const network_op_t OP_DONE = 12; // Please close the connection. -static const network_op_t OP_DONE = 10; // Please close the connection. +static const network_op_t OP_DBG_BLOCKMAP = 13; -static const network_op_t OP_DBG_BLOCKMAP = 11; - -static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 11; +static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 13; //error codes static const network_op_t LOGSTORE_FIRST_ERROR = 28; @@ -137,17 +137,34 @@ static inline int writeoptosocket(int sockd, network_op_t op) { return writetosocket(sockd, &op, sizeof(network_op_t)); } -static inline datatuple* readtuplefromsocket(int sockd) { +/** + Iterator wire format: + + LOGSTORE_RESPONSE_SENDING_TUPLES + TUPLE + TUPLE + TUPLE + datatuple::DELETE + + */ + +/** + @param sockd The socket. + @param error will be set to zero on succes, a logstore error number on failure + @return a datatuple, or NULL. + */ +static inline datatuple* readtuplefromsocket(int sockd, int * err) { datatuple::len_t keylen, datalen, buflen; - if( readfromsocket(sockd, &keylen, sizeof(keylen)) ) return NULL; - if( readfromsocket(sockd, &datalen, sizeof(datalen)) ) return NULL; + if(( *err = readfromsocket(sockd, &keylen, sizeof(keylen)) )) return NULL; + if(keylen == datatuple::DELETE) return NULL; // *err is zero. + if(( *err = readfromsocket(sockd, &datalen, sizeof(datalen)) )) return NULL; buflen = datatuple::length_from_header(keylen, datalen); byte* bytes = (byte*) malloc(buflen); - if( readfromsocket(sockd, bytes, buflen) ) return NULL; + if(( *err = readfromsocket(sockd, bytes, buflen) )) return NULL; return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer. } @@ -163,5 +180,16 @@ static inline int writetupletosocket(int sockd, const datatuple* tup) { return 0; } +static inline uint64_t readcountfromsocket(int sockd, int *err) { + uint64_t ret; + *err = readfromsocket(sockd, &ret, sizeof(ret)); + return ret; +} +static inline int writecounttosocket(int sockd, uint64_t count) { + return writetosocket(sockd, &count, sizeof(count)); +} +static inline int writeendofiteratortosocket(int sockd) { + return writetosocket(sockd, &datatuple::DELETE, sizeof(datatuple::DELETE)); +} #endif /* NETWORK_H_ */ diff --git a/tcpclient.cpp b/tcpclient.cpp index a90c378..c62c297 100644 --- a/tcpclient.cpp +++ b/tcpclient.cpp @@ -20,8 +20,6 @@ #include "datatuple.h" #include "network.h" -// const char *appid; -// const char *region; struct logstore_handle_t { char *host; int portnum; @@ -31,27 +29,6 @@ struct logstore_handle_t { int server_socket; }; -//LogStoreDBImpl::LogStoreDBImpl(const TestSettings & testSettings): -// const char *appid, int timeout, const char *region, int portnum){ -// host_(testSettings.host()), -// appid_(testSettings.appID()), -// timeout_(testSettings.timeout()), -// region_(testSettings.myRegion()), -// routerLatency_(0.0), -// suLatency_(0.0) -// const std::string& appid_; -// const int timeout_; -// const std::string& region_; -// -// int portnum; -// -// int server_socket; -// -// struct sockaddr_in serveraddr; -// struct hostent *server; -// ret->server_socket = -1; -// portnum = 32432; //this should be an argument. - logstore_handle_t * logstore_client_open(const char *host, int portnum, int timeout) { logstore_handle_t *ret = (logstore_handle_t*) malloc(sizeof(*ret)); ret->host = strdup(host); @@ -85,9 +62,6 @@ static inline void close_conn(logstore_handle_t *l) { } datatuple * logstore_client_op(logstore_handle_t *l, -// int *server_socket, -// struct sockaddr_in serveraddr, -// struct hostent *server, uint8_t opcode, datatuple * tuple) { @@ -141,9 +115,13 @@ logstore_client_op(logstore_handle_t *l, datatuple * ret; if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES) - { - ret = readtuplefromsocket(l->server_socket); - + { int err; + uint64_t count = 0; // XXX + while(( ret = readtuplefromsocket(l->server_socket, &err) )) { + if(err) { close_conn(l); return 0; } + count++; + } + printf("return count: %lld\n", count); } else if(rcode == LOGSTORE_RESPONSE_SUCCESS) { ret = tuple; } else {