Implemented network iterator protocol. It hasn't been tested or hooked up to anything because logstore iterators don't exist yet.

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@589 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-02-19 00:34:41 +00:00
parent ed9d4fc377
commit 7738b67bc0
4 changed files with 63 additions and 41 deletions

View file

@ -12,8 +12,8 @@ public:
typedef uint32_t len_t ; typedef uint32_t len_t ;
typedef unsigned char* key_t ; typedef unsigned char* key_t ;
typedef unsigned char* data_t ; typedef unsigned char* data_t ;
private:
static const len_t DELETE = ((len_t)0) - 1; static const len_t DELETE = ((len_t)0) - 1;
private:
len_t datalen_; len_t datalen_;
byte* key_; byte* key_;
byte* data_; // aliases key. data_ - 1 should be the \0 terminating key_. byte* data_; // aliases key. data_ - 1 should be the \0 terminating key_.

View file

@ -436,13 +436,12 @@ void * thread_work_fn( void * args)
continue; continue;
} }
int err;
//step 2: read the tuple from client //step 2: read the tuple from client
datatuple * tuple = readtuplefromsocket(*(item->data->workitem)); datatuple * tuple = readtuplefromsocket(*(item->data->workitem), &err);
//step 3: process the tuple //step 3: process the tuple
int err = 0;
if(opcode == OP_INSERT) if(opcode == OP_INSERT)
{ {
//insert/update/delete //insert/update/delete
@ -492,6 +491,23 @@ void * thread_work_fn( void * args)
datatuple::freetuple(dt); datatuple::freetuple(dt);
} }
} }
else if(opcode == OP_SCAN)
{
datatuple * end_tuple;
size_t limit;
if(!err) { end_tuple = readtuplefromsocket(*(item->data->workitem), &err); }
if(!err) { limit = readcountfromsocket(*(item->data->workitem), &err); }
if(!err) {
treeIterator<datatuple> * itr;
// if(tuple) {
// itr = new treeIterator<datatuple>(item->data->ltable, *tuple);
// } else {
// itr = new treeIterator<datatuple>(item->data->ltable);
// }
abort();
delete itr;
}
}
else if(opcode == OP_DBG_BLOCKMAP) else if(opcode == OP_DBG_BLOCKMAP)
{ {
// produce a list of stasis regions // produce a list of stasis regions

View file

@ -24,12 +24,12 @@ static const network_op_t LOGSTORE_LAST_RESPONSE_CODE = 3;
static const network_op_t LOGSTORE_FIRST_REQUEST_CODE = 8; static const network_op_t LOGSTORE_FIRST_REQUEST_CODE = 8;
static const network_op_t OP_INSERT = 8; // Create, Update, Delete static const network_op_t OP_INSERT = 8; // Create, Update, Delete
static const network_op_t OP_FIND = 9; // Read static const network_op_t OP_FIND = 9; // Read
static const network_op_t OP_SCAN = 11;
static const network_op_t OP_DONE = 12; // Please close the connection.
static const network_op_t OP_DONE = 10; // Please close the connection. static const network_op_t OP_DBG_BLOCKMAP = 13;
static const network_op_t OP_DBG_BLOCKMAP = 11; static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 13;
static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 11;
//error codes //error codes
static const network_op_t LOGSTORE_FIRST_ERROR = 28; static const network_op_t LOGSTORE_FIRST_ERROR = 28;
@ -137,17 +137,34 @@ static inline int writeoptosocket(int sockd, network_op_t op) {
return writetosocket(sockd, &op, sizeof(network_op_t)); return writetosocket(sockd, &op, sizeof(network_op_t));
} }
static inline datatuple* readtuplefromsocket(int sockd) { /**
Iterator wire format:
LOGSTORE_RESPONSE_SENDING_TUPLES
TUPLE
TUPLE
TUPLE
datatuple::DELETE
*/
/**
@param sockd The socket.
@param error will be set to zero on succes, a logstore error number on failure
@return a datatuple, or NULL.
*/
static inline datatuple* readtuplefromsocket(int sockd, int * err) {
datatuple::len_t keylen, datalen, buflen; datatuple::len_t keylen, datalen, buflen;
if( readfromsocket(sockd, &keylen, sizeof(keylen)) ) return NULL; if(( *err = readfromsocket(sockd, &keylen, sizeof(keylen)) )) return NULL;
if( readfromsocket(sockd, &datalen, sizeof(datalen)) ) return NULL; if(keylen == datatuple::DELETE) return NULL; // *err is zero.
if(( *err = readfromsocket(sockd, &datalen, sizeof(datalen)) )) return NULL;
buflen = datatuple::length_from_header(keylen, datalen); buflen = datatuple::length_from_header(keylen, datalen);
byte* bytes = (byte*) malloc(buflen); byte* bytes = (byte*) malloc(buflen);
if( readfromsocket(sockd, bytes, buflen) ) return NULL; if(( *err = readfromsocket(sockd, bytes, buflen) )) return NULL;
return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer. return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer.
} }
@ -163,5 +180,16 @@ static inline int writetupletosocket(int sockd, const datatuple* tup) {
return 0; return 0;
} }
static inline uint64_t readcountfromsocket(int sockd, int *err) {
uint64_t ret;
*err = readfromsocket(sockd, &ret, sizeof(ret));
return ret;
}
static inline int writecounttosocket(int sockd, uint64_t count) {
return writetosocket(sockd, &count, sizeof(count));
}
static inline int writeendofiteratortosocket(int sockd) {
return writetosocket(sockd, &datatuple::DELETE, sizeof(datatuple::DELETE));
}
#endif /* NETWORK_H_ */ #endif /* NETWORK_H_ */

View file

@ -20,8 +20,6 @@
#include "datatuple.h" #include "datatuple.h"
#include "network.h" #include "network.h"
// const char *appid;
// const char *region;
struct logstore_handle_t { struct logstore_handle_t {
char *host; char *host;
int portnum; int portnum;
@ -31,27 +29,6 @@ struct logstore_handle_t {
int server_socket; int server_socket;
}; };
//LogStoreDBImpl::LogStoreDBImpl(const TestSettings & testSettings):
// const char *appid, int timeout, const char *region, int portnum){
// host_(testSettings.host()),
// appid_(testSettings.appID()),
// timeout_(testSettings.timeout()),
// region_(testSettings.myRegion()),
// routerLatency_(0.0),
// suLatency_(0.0)
// const std::string& appid_;
// const int timeout_;
// const std::string& region_;
//
// int portnum;
//
// int server_socket;
//
// struct sockaddr_in serveraddr;
// struct hostent *server;
// ret->server_socket = -1;
// portnum = 32432; //this should be an argument.
logstore_handle_t * logstore_client_open(const char *host, int portnum, int timeout) { logstore_handle_t * logstore_client_open(const char *host, int portnum, int timeout) {
logstore_handle_t *ret = (logstore_handle_t*) malloc(sizeof(*ret)); logstore_handle_t *ret = (logstore_handle_t*) malloc(sizeof(*ret));
ret->host = strdup(host); ret->host = strdup(host);
@ -85,9 +62,6 @@ static inline void close_conn(logstore_handle_t *l) {
} }
datatuple * datatuple *
logstore_client_op(logstore_handle_t *l, logstore_client_op(logstore_handle_t *l,
// int *server_socket,
// struct sockaddr_in serveraddr,
// struct hostent *server,
uint8_t opcode, datatuple * tuple) uint8_t opcode, datatuple * tuple)
{ {
@ -141,9 +115,13 @@ logstore_client_op(logstore_handle_t *l,
datatuple * ret; datatuple * ret;
if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES) if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES)
{ { int err;
ret = readtuplefromsocket(l->server_socket); uint64_t count = 0; // XXX
while(( ret = readtuplefromsocket(l->server_socket, &err) )) {
if(err) { close_conn(l); return 0; }
count++;
}
printf("return count: %lld\n", count);
} else if(rcode == LOGSTORE_RESPONSE_SUCCESS) { } else if(rcode == LOGSTORE_RESPONSE_SUCCESS) {
ret = tuple; ret = tuple;
} else { } else {