backport client code from ydht load generator

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@543 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-02-02 23:50:16 +00:00
parent 3cbcb1e274
commit 1725b6569d
9 changed files with 319 additions and 161 deletions

View file

@ -34,6 +34,12 @@ MACRO(CREATE_EXECUTABLE NAME)
TARGET_LINK_LIBRARIES(${NAME} ${COMMON_LIBRARIES})
ENDMACRO(CREATE_EXECUTABLE)
MACRO(CREATE_CLIENT_EXECUTABLE NAME)
ADD_EXECUTABLE(${NAME} ${NAME}.cpp)
TARGET_LINK_LIBRARIES(${NAME} ${CLIENT_LIBRARIES})
ENDMACRO(CREATE_EXECUTABLE)
# Output the config.h file
#CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h)
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}/../stasis/
@ -43,6 +49,8 @@ INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}/../stasis/
# set linker path for this and all subdirs
LINK_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}/../stasis/build/src/stasis ${CMAKE_CURRENT_BINARY_DIR})
SET(CLIENT_LIBRARIES logstore_client)
IF ( "${CMAKE_C_COMPILER_ID}" STREQUAL "GNU" )
SET(COMMON_LIBRARIES logstore stasis m pthread stdc++)
SET(CMAKE_C_FLAGS "-g -Wall -pedantic -std=gnu99 -DPBL_COMPAT -D_FILE_OFFSET_BITS=64 ${CMAKE_C_FLAGS}")
@ -62,5 +70,5 @@ ENDIF ( "${CMAKE_C_COMPILER_ID}" STREQUAL "GNU" )
#CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h)
ADD_LIBRARY(logstore logserver.cpp logstore.cpp logiterators.cpp datapage.cpp merger.cpp tuplemerger.cpp)
ADD_LIBRARY(logstore_client tcpclient.cpp datapage.cpp)
CREATE_EXECUTABLE(server)

View file

@ -6,6 +6,8 @@
#include "logstore.h"
#include "network.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
@ -19,19 +21,6 @@
#undef try
//server codes
uint8_t logserver::OP_SUCCESS = 1;
uint8_t logserver::OP_FAIL = 2;
uint8_t logserver::OP_SENDING_TUPLE = 3;
//client codes
uint8_t logserver::OP_FIND = 4;
uint8_t logserver::OP_INSERT = 5;
uint8_t logserver::OP_DONE = 6;
uint8_t logserver::OP_INVALID = 32;
void *serverLoop(void *args);
void logserver::startserver(logtable *ltable)
@ -441,14 +430,14 @@ void * thread_work_fn( void * args)
uint8_t opcode;
ssize_t n = read(*(item->data->workitem), &opcode, sizeof(uint8_t));
if(n == 0) {
opcode = logserver::OP_DONE;
opcode = OP_DONE;
n = sizeof(uint8_t);
printf("Obsolescent client closed connection uncleanly\n");
}
assert( n == sizeof(uint8_t));
assert( opcode < logserver::OP_INVALID );
assert( opcode < OP_INVALID );
if( opcode == logserver::OP_DONE ) //close the conn on failure
if( opcode == OP_DONE ) //close the conn on failure
{
pthread_mutex_lock(item->data->qlock);
printf("client done. conn closed. (%d, %d, %d, %d)\n",
@ -489,12 +478,12 @@ void * thread_work_fn( void * args)
//read the key
tuple.key = (byte*) malloc(*tuple.keylen);
logserver::readfromsocket(*(item->data->workitem), (byte*) tuple.key, *tuple.keylen);
readfromsocket(*(item->data->workitem), (char*) tuple.key, *tuple.keylen);
//read the data
if(!tuple.isDelete() && opcode != logserver::OP_FIND)
if(!tuple.isDelete() && opcode != OP_FIND)
{
tuple.data = (byte*) malloc(*tuple.datalen);
logserver::readfromsocket(*(item->data->workitem), (byte*) tuple.data, *tuple.datalen);
readfromsocket(*(item->data->workitem), (char*) tuple.data, *tuple.datalen);
}
else
tuple.data = 0;
@ -503,7 +492,7 @@ void * thread_work_fn( void * args)
//pthread_mutex_lock(item->data->table_lock);
//readlock(item->data->table_lock,0);
if(opcode == logserver::OP_INSERT)
if(opcode == OP_INSERT)
{
//insert/update/delete
item->data->ltable->insertTuple(tuple);
@ -511,12 +500,12 @@ void * thread_work_fn( void * args)
//pthread_mutex_unlock(item->data->table_lock);
//unlock(item->data->table_lock);
//step 4: send response
uint8_t rcode = logserver::OP_SUCCESS;
uint8_t rcode = OP_SUCCESS;
n = write(*(item->data->workitem), &rcode, sizeof(uint8_t));
assert(n == sizeof(uint8_t));
}
else if(opcode == logserver::OP_FIND)
else if(opcode == OP_FIND)
{
//find the tuple
datatuple *dt = item->data->ltable->findTuple(-1, tuple.key, *tuple.keylen);
@ -550,12 +539,12 @@ void * thread_work_fn( void * args)
}
//send the reply code
uint8_t rcode = logserver::OP_SENDING_TUPLE;
uint8_t rcode = OP_SENDING_TUPLE;
n = write(*(item->data->workitem), &rcode, sizeof(uint8_t));
assert(n == sizeof(uint8_t));
//send the tuple
logserver::writetosocket(*(item->data->workitem), (byte*) dt->keylen, dt->byte_length());
writetosocket(*(item->data->workitem), (char*) dt->keylen, dt->byte_length());
//free datatuple
free(dt->keylen);

View file

@ -94,17 +94,17 @@ class logserver
{
public:
//server codes
static uint8_t OP_SUCCESS;
static uint8_t OP_FAIL;
static uint8_t OP_SENDING_TUPLE;
//client codes
static uint8_t OP_FIND;
static uint8_t OP_INSERT;
static uint8_t OP_DONE;
static uint8_t OP_INVALID;
// static uint8_t OP_SUCCESS;
// static uint8_t OP_FAIL;
// static uint8_t OP_SENDING_TUPLE;
//
// //client codes
// static uint8_t OP_FIND;
// static uint8_t OP_INSERT;
//
// static uint8_t OP_DONE;
//
// static uint8_t OP_INVALID;
public:
logserver(int nthreads, int server_port){
@ -139,32 +139,6 @@ public:
void stopserver();
public:
// XXX utility methods, pull out into some other class.
static inline void readfromsocket(int sockd, byte *buf, int count)
{
int n = 0;
while( n < count )
{
n += read( sockd, buf + n, count - n);
}
}
static inline void writetosocket(int sockd, byte *buf, int count)
{
int n = 0;
while( n < count )
{
n += write( sockd, buf + n, count - n);
}
}
private:
//main loop of server

47
network.h Normal file
View file

@ -0,0 +1,47 @@
/*
* network.h
*
* Created on: Feb 2, 2010
* Author: sears
*/
#ifndef NETWORK_H_
#define NETWORK_H_
#include <stdio.h>
//server codes
static const uint8_t OP_SUCCESS = 1;
static const uint8_t OP_FAIL = 2;
static const uint8_t OP_SENDING_TUPLE = 3;
//client codes
static const uint8_t OP_FIND = 4;
static const uint8_t OP_INSERT = 5;
static const uint8_t OP_DONE = 6;
static const uint8_t OP_INVALID = 32;
static inline void readfromsocket(int sockd, char *buf, int count)
{
int n = 0;
while( n < count )
{
n += read( sockd, buf + n, count - n);
}
}
static inline void writetosocket(int sockd, char *buf, int count)
{
int n = 0;
while( n < count )
{
n += write( sockd, buf + n, count - n);
}
}
#endif /* NETWORK_H_ */

196
tcpclient.cpp Normal file
View file

@ -0,0 +1,196 @@
/*
* tcpclient.cpp
*
* Created on: Feb 2, 2010
* Author: sears
*/
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include "tcpclient.h"
#include "datatuple.h"
#include "network.h"
// const char *appid;
// const char *region;
struct logstore_handle_t {
char *host;
int portnum;
int timeout;
struct sockaddr_in serveraddr;
struct hostent* server;
int server_socket;
};
//LogStoreDBImpl::LogStoreDBImpl(const TestSettings & testSettings):
// const char *appid, int timeout, const char *region, int portnum){
// host_(testSettings.host()),
// appid_(testSettings.appID()),
// timeout_(testSettings.timeout()),
// region_(testSettings.myRegion()),
// routerLatency_(0.0),
// suLatency_(0.0)
// const std::string& appid_;
// const int timeout_;
// const std::string& region_;
//
// int portnum;
//
// int server_socket;
//
// struct sockaddr_in serveraddr;
// struct hostent *server;
// ret->server_socket = -1;
// portnum = 32432; //this should be an argument.
logstore_handle_t * logstore_client_open(const char *host, int portnum, int timeout) {
logstore_handle_t *ret = (logstore_handle_t*) malloc(sizeof(*ret));
ret->host = strdup(host);
ret->portnum = portnum;
if(ret->portnum == 0) { ret->portnum = 32432; }
ret->timeout = timeout;
ret->server_socket = -1;
ret->server = gethostbyname(ret->host);
if (ret->server == NULL) {
fprintf(stderr,"ERROR, no such host as %s\n", ret->host);
free(ret->host); free(ret); return 0;
}
/* build the server's Internet address */
bzero((char *) &ret->serveraddr, sizeof(ret->serveraddr));
ret->serveraddr.sin_family = AF_INET;
bcopy((char *)ret->server->h_addr,
(char *)&ret->serveraddr.sin_addr.s_addr, ret->server->h_length);
ret->serveraddr.sin_port = htons(ret->portnum);
printf("LogStore start\n");
return ret;
}
datatuple *
logstore_client_op(logstore_handle_t *l,
// int *server_socket,
// struct sockaddr_in serveraddr,
// struct hostent *server,
uint8_t opcode, datatuple &tuple)
{
if(l->server_socket < 0)
{
l->server_socket = socket(AF_INET, SOCK_STREAM, 0);
if (l->server_socket < 0)
{
printf("ERROR opening socket.\n");
return 0;
}
int flag = 1;
int result = setsockopt(l->server_socket, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */
TCP_NODELAY, /* name of option */
(char *) &flag, /* the cast is historical
cruft */
sizeof(int)); /* length of option value */
if (result < 0)
{
printf("ERROR on setting socket option TCP_NODELAY.\n");
return 0;
}
/* connect: create a connection with the server */
if (connect(l->server_socket, (sockaddr*) &(l->serveraddr), sizeof(l->serveraddr)) < 0)
{
printf("ERROR connecting\n");
return 0;
}
printf("sock opened %d\n", l->server_socket);
}
//send the opcode
int n = write(l->server_socket, (byte*) &opcode, sizeof(uint8_t));
assert(n == sizeof(uint8_t));
//send the tuple
n = write(l->server_socket, (byte*) tuple.keylen, sizeof(uint32_t));
assert( n == sizeof(uint32_t));
n = write(l->server_socket, (byte*) tuple.datalen, sizeof(uint32_t));
assert( n == sizeof(uint32_t));
writetosocket(l->server_socket, (char*) tuple.key, *tuple.keylen);
if(!tuple.isDelete() && *tuple.datalen != 0)
writetosocket(l->server_socket, (char*) tuple.data, *tuple.datalen);
//printf("\nssocket %d ", *server_socket);
//read the reply code
uint8_t rcode;
n = read(l->server_socket, (byte*) &rcode, sizeof(uint8_t));
if( n <= 0 )
{
printf("read err.. conn closed.\n");
close(l->server_socket); //close the connection
l->server_socket = -1;
return 0;
}
//printf("rdone\n");
datatuple * ret;
if(rcode == OP_SENDING_TUPLE)
{
datatuple *rcvdtuple = (datatuple*)malloc(sizeof(datatuple));
//read the keylen
rcvdtuple->keylen = (uint32_t*) malloc(sizeof(uint32_t));
n = read(l->server_socket, (char*) rcvdtuple->keylen, sizeof(uint32_t));
assert(n == sizeof(uint32_t));
//read the datalen
rcvdtuple->datalen = (uint32_t*) malloc(sizeof(uint32_t));
n = read(l->server_socket, (byte*) rcvdtuple->datalen, sizeof(uint32_t));
assert(n == sizeof(uint32_t));
//read key
rcvdtuple->key = (byte*) malloc(*rcvdtuple->keylen);
readfromsocket(l->server_socket, (char*) rcvdtuple->key, *rcvdtuple->keylen);
if(!rcvdtuple->isDelete())
{
//read key
rcvdtuple->data = (byte*) malloc(*rcvdtuple->datalen);
readfromsocket(l->server_socket, (char*) rcvdtuple->data, *rcvdtuple->datalen);
}
ret = rcvdtuple;
} else if(rcode == OP_SUCCESS) {
ret = &tuple;
} else {
ret = 0;
}
return ret;
}
int logstore_client_close(logstore_handle_t* l) {
if(l->server_socket > 0)
{
writetosocket(l->server_socket, (char*) &OP_DONE, sizeof(uint8_t));
close(l->server_socket);
printf("socket closed %d\n.", l->server_socket);
}
free(l->host);
free(l);
return 0;
}

24
tcpclient.h Normal file
View file

@ -0,0 +1,24 @@
/*
* tcpclient.h
*
* Created on: Feb 2, 2010
* Author: sears
*/
#ifndef TCPCLIENT_H_
#define TCPCLIENT_H_
#include "datatuple.h"
typedef struct logstore_handle_t logstore_handle_t;
logstore_handle_t * logstore_client_open(const char *host, int portnum, int timeout);
datatuple * logstore_client_op(logstore_handle_t* l,
uint8_t opcode,
datatuple &tuple);
int logstore_client_close(logstore_handle_t* l);
#endif /* TCPCLIENT_H_ */

View file

@ -6,4 +6,4 @@ CREATE_CHECK(check_merge)
CREATE_CHECK(check_mergelarge)
CREATE_CHECK(check_mergetuple)
CREATE_CHECK(check_rbtree)
CREATE_CHECK(check_tcpclient)
CREATE_CLIENT_EXECUTABLE(check_tcpclient)

View file

@ -11,6 +11,9 @@
#include <time.h>
#include <sys/types.h>
#include "../tcpclient.h"
#include "../network.h"
#include "check_util.h"
#undef begin
@ -23,8 +26,11 @@ static int svrport = 32432;
void insertProbeIter(size_t NUM_ENTRIES)
{
srand(1000);
std::string servername = svrname; //"sherpa4";
int serverport = svrport; //32432;
// std::string servername = svrname; //"sherpa4";
// int serverport = svrport; //32432;
logstore_handle_t * l = logstore_client_open(svrname, svrport, 100);
double delete_freq = .05;
double update_freq = .15;
@ -144,11 +150,13 @@ void insertProbeIter(size_t NUM_ENTRIES)
gettimeofday(&ti_st,0);
//send the data
datatuple * ret = sendTuple(servername, serverport, logserver::OP_INSERT, newtuple);
// datatuple * ret = sendTuple(servername, serverport, OP_INSERT, newtuple);
datatuple * ret = logstore_client_op(l, OP_INSERT, newtuple);
assert(ret);
gettimeofday(&ti_end,0);
insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
// insert_time += tv_to_double(ti_end) - tv_to_double(ti_st);
insert_time ++; // XXX
free(newtuple.key);
free(newtuple.data);
@ -159,13 +167,14 @@ void insertProbeIter(size_t NUM_ENTRIES)
}
gettimeofday(&stop_tv,0);
printf("insert time: %6.1f\n", insert_time);
printf("insert time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv)));
printf("insert time: %6.1f\n", -1.0); // XXX (tv_to_double(stop_tv) - tv_to_double(start_tv)));
printf("#deletions: %d\n#updates: %d\n", delcount, upcount);
printf("Stage 2: Looking up %d keys:\n", NUM_ENTRIES);
int found_tuples=0;
for(int i=NUM_ENTRIES-1; i>=0; i--)
{
@ -186,7 +195,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
memcpy((byte*)searchtuple.key, (*key_arr)[ri].c_str(), keylen);
//find the key with the given tuple
datatuple *dt = sendTuple(servername, serverport, logserver::OP_FIND,
datatuple *dt = logstore_client_op(l, OP_FIND, //servername, serverport, OP_FIND,
searchtuple);
assert(dt!=0);
@ -213,8 +222,10 @@ void insertProbeIter(size_t NUM_ENTRIES)
delete key_arr;
//delete data_arr;
logstore_client_close(l);
gettimeofday(&stop_tv,0);
printf("run time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv)));
printf("run time: %6.1f\n", -1.0); // XXX (tv_to_double(stop_tv) - tv_to_double(start_tv)));
}

View file

@ -153,95 +153,4 @@ void preprandstr(int count, std::vector<std::string> &arr, int avg_len=50, bool
}
datatuple * sendTuple(std::string & servername, int serverport, uint8_t opcode, datatuple &tuple)
{
struct sockaddr_in serveraddr;
struct hostent *server;
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
perror("ERROR opening socket.\n");
return 0;
}
server = gethostbyname(servername.c_str());
if (server == NULL) {
fprintf(stderr,"ERROR, no such host as %s\n", servername.c_str());
close(sockfd);
exit(0);
}
/* build the server's Internet address */
bzero((char *) &serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
bcopy((char *)server->h_addr,
(char *)&serveraddr.sin_addr.s_addr, server->h_length);
serveraddr.sin_port = htons(serverport);
/* connect: create a connection with the server */
if (connect(sockfd, (sockaddr*) &serveraddr, sizeof(serveraddr)) < 0)
{
perror("ERROR connecting\n");
close(sockfd);
return 0;
}
//send the opcode
int n = write(sockfd, (byte*) &opcode, sizeof(uint8_t));
assert(n == sizeof(uint8_t));
//send the tuple
n = write(sockfd, (byte*) tuple.keylen, sizeof(uint32_t));
assert( n == sizeof(uint32_t));
n = write(sockfd, (byte*) tuple.datalen, sizeof(uint32_t));
assert( n == sizeof(uint32_t));
logserver::writetosocket(sockfd, (byte*) tuple.key, *tuple.keylen);
if(!tuple.isDelete() && *tuple.datalen != 0)
logserver::writetosocket(sockfd, (byte*) tuple.data, *tuple.datalen);
//read the reply code
uint8_t rcode;
n = read(sockfd, (byte*) &rcode, sizeof(uint8_t));
datatuple * ret;
if(rcode == logserver::OP_SENDING_TUPLE)
{
datatuple *rcvdtuple = (datatuple*)malloc(sizeof(datatuple));
//read the keylen
rcvdtuple->keylen = (uint32_t*) malloc(sizeof(uint32_t));
n = read(sockfd, (byte*) rcvdtuple->keylen, sizeof(uint32_t));
assert(n == sizeof(uint32_t));
//read the datalen
rcvdtuple->datalen = (uint32_t*) malloc(sizeof(uint32_t));
n = read(sockfd, (byte*) rcvdtuple->datalen, sizeof(uint32_t));
assert(n == sizeof(uint32_t));
//read key
rcvdtuple->key = (byte*) malloc(*rcvdtuple->keylen);
logserver::readfromsocket(sockfd, (byte*) rcvdtuple->key, *rcvdtuple->keylen);
if(!rcvdtuple->isDelete())
{
//read key
rcvdtuple->data = (byte*) malloc(*rcvdtuple->datalen);
logserver::readfromsocket(sockfd, (byte*) rcvdtuple->data, *rcvdtuple->datalen);
}
ret = rcvdtuple;
} else if(rcode == logserver::OP_SUCCESS) {
ret = &tuple;
} else {
ret = 0;
}
close(sockfd);
return ret;
}
#endif /* CHECK_UTIL_H_ */