tcpclient now usees fread/fwrite instead of read/write
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@983 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
2a45c2cda8
commit
4a42e4f904
2 changed files with 140 additions and 20 deletions
119
network.h
119
network.h
|
@ -63,6 +63,23 @@ typedef enum {
|
||||||
LOGSTORE_SERVER_RESPONSE
|
LOGSTORE_SERVER_RESPONSE
|
||||||
} logstore_opcode_type;
|
} logstore_opcode_type;
|
||||||
|
|
||||||
|
static inline int freadfromsocket(FILE * sockf, void *buf, ssize_t count) {
|
||||||
|
ssize_t i = fread_unlocked(buf, sizeof(byte), count, sockf);
|
||||||
|
if(i != count) {
|
||||||
|
if(feof(sockf)) {
|
||||||
|
errno = EOF;
|
||||||
|
return EOF;
|
||||||
|
} else if(ferror(sockf)) {
|
||||||
|
perror("readfromsocket failed");
|
||||||
|
errno = -1;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
printf("logic bug? unreported short read?\n");
|
||||||
|
errno = EOF;
|
||||||
|
return EOF;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
static inline int readfromsocket(int sockd, void *buf, ssize_t count)
|
static inline int readfromsocket(int sockd, void *buf, ssize_t count)
|
||||||
{
|
{
|
||||||
|
|
||||||
|
@ -84,6 +101,23 @@ static inline int readfromsocket(int sockd, void *buf, ssize_t count)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline int fwritetosocket(FILE * sockf, const void *buf, ssize_t count) {
|
||||||
|
ssize_t i = fwrite_unlocked((byte*)buf, sizeof(byte), count, sockf);
|
||||||
|
if(i != count) {
|
||||||
|
if(feof(sockf)) {
|
||||||
|
errno = EOF;
|
||||||
|
return errno;
|
||||||
|
} else if(ferror(sockf)) {
|
||||||
|
perror("writetosocket failed");
|
||||||
|
errno = -1;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
printf("logic error? unreported short write?\n");
|
||||||
|
errno = EOF;
|
||||||
|
return EOF;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
static inline int writetosocket(int sockd, const void *buf, ssize_t count)
|
static inline int writetosocket(int sockd, const void *buf, ssize_t count)
|
||||||
{
|
{
|
||||||
ssize_t n = 0;
|
ssize_t n = 0;
|
||||||
|
@ -91,7 +125,7 @@ static inline int writetosocket(int sockd, const void *buf, ssize_t count)
|
||||||
while( n < count )
|
while( n < count )
|
||||||
{
|
{
|
||||||
ssize_t i = write( sockd, ((byte*)buf) + n, count - n);
|
ssize_t i = write( sockd, ((byte*)buf) + n, count - n);
|
||||||
if(i == -1) {
|
if(i == -1) { // XXX not threadsafe!
|
||||||
perror("writetosocket failed");
|
perror("writetosocket failed");
|
||||||
return errno;
|
return errno;
|
||||||
} else if(i == 0) {
|
} else if(i == 0) {
|
||||||
|
@ -113,6 +147,43 @@ static inline bool opisresponse(network_op_t op) {
|
||||||
return (LOGSTORE_FIRST_RESPONSE_CODE <= op && op <= LOGSTORE_LAST_RESPONSE_CODE);
|
return (LOGSTORE_FIRST_RESPONSE_CODE <= op && op <= LOGSTORE_LAST_RESPONSE_CODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline network_op_t freadopfromsocket(FILE * sockf, logstore_opcode_type type) {
|
||||||
|
network_op_t ret;
|
||||||
|
ssize_t n = fread(&ret, sizeof(network_op_t), 1, sockf);
|
||||||
|
if(n == sizeof(network_op_t)) {
|
||||||
|
// done.
|
||||||
|
} else if(n == 0) { // EOF
|
||||||
|
perror("Socket closed mid request.");
|
||||||
|
return LOGSTORE_CONN_CLOSED_ERROR;
|
||||||
|
} else {
|
||||||
|
assert(n == -1); // sizeof(network_op_t) is 1, so short reads are impossible.
|
||||||
|
perror("Could not read opcode from socket");
|
||||||
|
return LOGSTORE_SOCKET_ERROR;
|
||||||
|
}
|
||||||
|
// sanity checking
|
||||||
|
switch(type) {
|
||||||
|
case LOGSTORE_CLIENT_REQUEST: {
|
||||||
|
if(!(opisrequest(ret) || opiserror(ret))) {
|
||||||
|
fprintf(stderr, "Read invalid request code %d\n", (int)ret);
|
||||||
|
if(opisresponse(ret)) {
|
||||||
|
fprintf(stderr, "(also, the request code is a valid response code)\n");
|
||||||
|
}
|
||||||
|
ret = LOGSTORE_PROTOCOL_ERROR;
|
||||||
|
}
|
||||||
|
} break;
|
||||||
|
case LOGSTORE_SERVER_RESPONSE: {
|
||||||
|
if(!(opisresponse(ret) || opiserror(ret))) {
|
||||||
|
fprintf(stderr, "Read invalid response code %d\n", (int)ret);
|
||||||
|
if(opisrequest(ret)) {
|
||||||
|
fprintf(stderr, "(also, the response code is a valid request code)\n");
|
||||||
|
}
|
||||||
|
ret = LOGSTORE_PROTOCOL_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type) {
|
static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type) {
|
||||||
network_op_t ret;
|
network_op_t ret;
|
||||||
ssize_t n = read(sockd, &ret, sizeof(network_op_t));
|
ssize_t n = read(sockd, &ret, sizeof(network_op_t));
|
||||||
|
@ -150,6 +221,10 @@ static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
static inline int fwriteoptosocket(FILE * sockf, network_op_t op) {
|
||||||
|
assert(opiserror(op) || opisrequest(op) || opisresponse(op));
|
||||||
|
return fwritetosocket(sockf, &op, sizeof(network_op_t));
|
||||||
|
}
|
||||||
static inline int writeoptosocket(int sockd, network_op_t op) {
|
static inline int writeoptosocket(int sockd, network_op_t op) {
|
||||||
assert(opiserror(op) || opisrequest(op) || opisresponse(op));
|
assert(opiserror(op) || opisrequest(op) || opisresponse(op));
|
||||||
return writetosocket(sockd, &op, sizeof(network_op_t));
|
return writetosocket(sockd, &op, sizeof(network_op_t));
|
||||||
|
@ -166,6 +241,22 @@ static inline int writeoptosocket(int sockd, network_op_t op) {
|
||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
static inline datatuple* freadtuplefromsocket(FILE * sockf, int * err) {
|
||||||
|
|
||||||
|
len_t keylen, datalen, buflen;
|
||||||
|
|
||||||
|
if(( *err = freadfromsocket(sockf, &keylen, sizeof(keylen)) )) return NULL;
|
||||||
|
if(keylen == DELETE) return NULL; // *err is zero.
|
||||||
|
if(( *err = freadfromsocket(sockf, &datalen, sizeof(datalen)) )) return NULL;
|
||||||
|
|
||||||
|
buflen = datatuple::length_from_header(keylen, datalen);
|
||||||
|
byte* bytes = (byte*) malloc(buflen);
|
||||||
|
|
||||||
|
if(( *err = freadfromsocket(sockf, bytes, buflen) )) return NULL;
|
||||||
|
|
||||||
|
return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer.
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@param sockd The socket.
|
@param sockd The socket.
|
||||||
@param error will be set to zero on succes, a logstore error number on failure
|
@param error will be set to zero on succes, a logstore error number on failure
|
||||||
|
@ -187,9 +278,27 @@ static inline datatuple* readtuplefromsocket(int sockd, int * err) {
|
||||||
return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer.
|
return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline int fwriteendofiteratortosocket(FILE * sockf) {
|
||||||
|
return fwritetosocket(sockf, &DELETE, sizeof(DELETE));
|
||||||
|
}
|
||||||
|
|
||||||
static inline int writeendofiteratortosocket(int sockd) {
|
static inline int writeendofiteratortosocket(int sockd) {
|
||||||
return writetosocket(sockd, &DELETE, sizeof(DELETE));
|
return writetosocket(sockd, &DELETE, sizeof(DELETE));
|
||||||
}
|
}
|
||||||
|
static inline int fwritetupletosocket(FILE * sockf, const datatuple *tup) {
|
||||||
|
len_t keylen, datalen;
|
||||||
|
int err;
|
||||||
|
|
||||||
|
if(tup == NULL) {
|
||||||
|
if(( err = fwriteendofiteratortosocket(sockf) )) return err;
|
||||||
|
} else {
|
||||||
|
const byte* buf = tup->get_bytes(&keylen, &datalen);
|
||||||
|
if(( err = fwritetosocket(sockf, &keylen, sizeof(keylen)) )) return err;
|
||||||
|
if(( err = fwritetosocket(sockf, &datalen, sizeof(datalen)) )) return err;
|
||||||
|
if(( err = fwritetosocket(sockf, buf, datatuple::length_from_header(keylen, datalen)) )) return err;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
static inline int writetupletosocket(int sockd, const datatuple* tup) {
|
static inline int writetupletosocket(int sockd, const datatuple* tup) {
|
||||||
len_t keylen, datalen;
|
len_t keylen, datalen;
|
||||||
int err;
|
int err;
|
||||||
|
@ -205,11 +314,19 @@ static inline int writetupletosocket(int sockd, const datatuple* tup) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
static inline uint64_t freadcountfromsocket(FILE* sockf, int *err) {
|
||||||
|
uint64_t ret;
|
||||||
|
*err = freadfromsocket(sockf, &ret, sizeof(ret));
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
static inline uint64_t readcountfromsocket(int sockd, int *err) {
|
static inline uint64_t readcountfromsocket(int sockd, int *err) {
|
||||||
uint64_t ret;
|
uint64_t ret;
|
||||||
*err = readfromsocket(sockd, &ret, sizeof(ret));
|
*err = readfromsocket(sockd, &ret, sizeof(ret));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
static inline int fwritecounttosocket(FILE* sockf, uint64_t count) {
|
||||||
|
return fwritetosocket(sockf, &count, sizeof(count));
|
||||||
|
}
|
||||||
static inline int writecounttosocket(int sockd, uint64_t count) {
|
static inline int writecounttosocket(int sockd, uint64_t count) {
|
||||||
return writetosocket(sockd, &count, sizeof(count));
|
return writetosocket(sockd, &count, sizeof(count));
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,7 @@ struct logstore_handle_t {
|
||||||
struct sockaddr_in serveraddr;
|
struct sockaddr_in serveraddr;
|
||||||
struct hostent* server;
|
struct hostent* server;
|
||||||
int server_socket;
|
int server_socket;
|
||||||
|
FILE * server_fsocket;
|
||||||
};
|
};
|
||||||
|
|
||||||
logstore_handle_t * logstore_client_open(const char *host, int portnum, int timeout) {
|
logstore_handle_t * logstore_client_open(const char *host, int portnum, int timeout) {
|
||||||
|
@ -38,7 +39,8 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time
|
||||||
ret->portnum = portnum;
|
ret->portnum = portnum;
|
||||||
if(ret->portnum == 0) { ret->portnum = 32432; }
|
if(ret->portnum == 0) { ret->portnum = 32432; }
|
||||||
ret->timeout = timeout;
|
ret->timeout = timeout;
|
||||||
ret->server_socket = -1;
|
ret->server_socket = -1;
|
||||||
|
ret->server_fsocket = NULL;
|
||||||
|
|
||||||
ret->server = gethostbyname(ret->host);
|
ret->server = gethostbyname(ret->host);
|
||||||
if (ret->server == NULL) {
|
if (ret->server == NULL) {
|
||||||
|
@ -59,9 +61,10 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void close_conn(logstore_handle_t *l) {
|
static inline void close_conn(logstore_handle_t *l) {
|
||||||
perror("read/write err.. conn closed.\n");
|
perror("read/write err.. conn closed.\n");
|
||||||
close(l->server_socket); //close the connection
|
fclose(l->server_fsocket); //close the connection
|
||||||
l->server_socket = -1;
|
l->server_fsocket = NULL;
|
||||||
|
l->server_socket = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint8_t
|
uint8_t
|
||||||
|
@ -70,15 +73,14 @@ logstore_client_op_returns_many(logstore_handle_t *l,
|
||||||
|
|
||||||
if(l->server_socket < 0)
|
if(l->server_socket < 0)
|
||||||
{
|
{
|
||||||
l->server_socket = socket(AF_INET, SOCK_STREAM, 0);
|
l->server_socket = socket(AF_INET, SOCK_STREAM, 0);
|
||||||
|
l->server_fsocket = fdopen(l->server_socket, "a+");
|
||||||
if (l->server_socket < 0)
|
if (l->server_socket < 0)
|
||||||
{
|
{
|
||||||
perror("ERROR opening socket.\n");
|
perror("ERROR opening socket.\n");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int flag = 1;
|
int flag = 1;
|
||||||
int result = setsockopt(l->server_socket, /* socket affected */
|
int result = setsockopt(l->server_socket, /* socket affected */
|
||||||
IPPROTO_TCP, /* set option at TCP level */
|
IPPROTO_TCP, /* set option at TCP level */
|
||||||
|
@ -92,7 +94,6 @@ logstore_client_op_returns_many(logstore_handle_t *l,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* connect: create a connection with the server */
|
/* connect: create a connection with the server */
|
||||||
if (connect(l->server_socket, (sockaddr*) &(l->serveraddr), sizeof(l->serveraddr)) < 0)
|
if (connect(l->server_socket, (sockaddr*) &(l->serveraddr), sizeof(l->serveraddr)) < 0)
|
||||||
{
|
{
|
||||||
|
@ -106,20 +107,22 @@ logstore_client_op_returns_many(logstore_handle_t *l,
|
||||||
network_op_t err = 0;
|
network_op_t err = 0;
|
||||||
|
|
||||||
//send the opcode
|
//send the opcode
|
||||||
if( !err) { err = writetosocket(l->server_socket, &opcode, sizeof(opcode)); }
|
if( !err) { err = fwritetosocket(l->server_fsocket, &opcode, sizeof(opcode)); }
|
||||||
|
|
||||||
//send the first tuple
|
//send the first tuple
|
||||||
if( !err) { err = writetupletosocket(l->server_socket, tuple); }
|
if( !err) { err = fwritetupletosocket(l->server_fsocket, tuple); }
|
||||||
|
|
||||||
//send the second tuple
|
//send the second tuple
|
||||||
if( !err) { err = writetupletosocket(l->server_socket, tuple2); }
|
if( !err) { err = fwritetupletosocket(l->server_fsocket, tuple2); }
|
||||||
|
|
||||||
if( (!err) && (count != (uint64_t)-1) ) {
|
if( (!err) && (count != (uint64_t)-1) ) {
|
||||||
err = writecounttosocket(l->server_socket, count); }
|
err = fwritecounttosocket(l->server_fsocket, count); }
|
||||||
|
|
||||||
|
fflush_unlocked(l->server_fsocket);
|
||||||
|
|
||||||
network_op_t rcode = LOGSTORE_CONN_CLOSED_ERROR;
|
network_op_t rcode = LOGSTORE_CONN_CLOSED_ERROR;
|
||||||
if( !err) {
|
if( !err) {
|
||||||
rcode = readopfromsocket(l->server_socket,LOGSTORE_SERVER_RESPONSE);
|
rcode = freadopfromsocket(l->server_fsocket,LOGSTORE_SERVER_RESPONSE);
|
||||||
}
|
}
|
||||||
|
|
||||||
if( opiserror(rcode) ) { close_conn(l); }
|
if( opiserror(rcode) ) { close_conn(l); }
|
||||||
|
@ -129,9 +132,9 @@ logstore_client_op_returns_many(logstore_handle_t *l,
|
||||||
}
|
}
|
||||||
datatuple *
|
datatuple *
|
||||||
logstore_client_next_tuple(logstore_handle_t *l) {
|
logstore_client_next_tuple(logstore_handle_t *l) {
|
||||||
assert(l->server_socket != -1); // otherwise, then the client forgot to check a return value...
|
assert(l->server_fsocket != 0); // otherwise, then the client forgot to check a return value...
|
||||||
int err = 0;
|
int err = 0;
|
||||||
datatuple * ret = readtuplefromsocket(l->server_socket, &err);
|
datatuple * ret = freadtuplefromsocket(l->server_fsocket, &err);
|
||||||
if(err) {
|
if(err) {
|
||||||
close_conn(l);
|
close_conn(l);
|
||||||
if(ret) {
|
if(ret) {
|
||||||
|
@ -174,12 +177,12 @@ logstore_client_op(logstore_handle_t *l,
|
||||||
}
|
}
|
||||||
|
|
||||||
int logstore_client_close(logstore_handle_t* l) {
|
int logstore_client_close(logstore_handle_t* l) {
|
||||||
if(l->server_socket > 0)
|
if(l->server_fsocket)
|
||||||
{
|
{
|
||||||
writetosocket(l->server_socket, (char*) &OP_DONE, sizeof(uint8_t));
|
fwritetosocket(l->server_fsocket, (char*) &OP_DONE, sizeof(uint8_t));
|
||||||
|
|
||||||
close(l->server_socket);
|
fclose(l->server_fsocket);
|
||||||
DEBUG("socket closed %d\n.", l->server_socket);
|
DEBUG("socket closed %d\n.", l->server_fsocket);
|
||||||
}
|
}
|
||||||
free(l->host);
|
free(l->host);
|
||||||
free(l);
|
free(l);
|
||||||
|
|
Loading…
Reference in a new issue