From 4a42e4f904bb92203595342562a02a8fa418f894 Mon Sep 17 00:00:00 2001 From: sears Date: Tue, 10 Aug 2010 23:56:54 +0000 Subject: [PATCH] tcpclient now usees fread/fwrite instead of read/write git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@983 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- network.h | 119 +++++++++++++++++++++++++++++++++++++++++++++++++- tcpclient.cpp | 41 +++++++++-------- 2 files changed, 140 insertions(+), 20 deletions(-) diff --git a/network.h b/network.h index 1d566c5..004ea91 100644 --- a/network.h +++ b/network.h @@ -63,6 +63,23 @@ typedef enum { LOGSTORE_SERVER_RESPONSE } logstore_opcode_type; +static inline int freadfromsocket(FILE * sockf, void *buf, ssize_t count) { + ssize_t i = fread_unlocked(buf, sizeof(byte), count, sockf); + if(i != count) { + if(feof(sockf)) { + errno = EOF; + return EOF; + } else if(ferror(sockf)) { + perror("readfromsocket failed"); + errno = -1; + return -1; + } + printf("logic bug? unreported short read?\n"); + errno = EOF; + return EOF; + } + return 0; +} static inline int readfromsocket(int sockd, void *buf, ssize_t count) { @@ -84,6 +101,23 @@ static inline int readfromsocket(int sockd, void *buf, ssize_t count) } +static inline int fwritetosocket(FILE * sockf, const void *buf, ssize_t count) { + ssize_t i = fwrite_unlocked((byte*)buf, sizeof(byte), count, sockf); + if(i != count) { + if(feof(sockf)) { + errno = EOF; + return errno; + } else if(ferror(sockf)) { + perror("writetosocket failed"); + errno = -1; + return -1; + } + printf("logic error? unreported short write?\n"); + errno = EOF; + return EOF; + } + return 0; +} static inline int writetosocket(int sockd, const void *buf, ssize_t count) { ssize_t n = 0; @@ -91,7 +125,7 @@ static inline int writetosocket(int sockd, const void *buf, ssize_t count) while( n < count ) { ssize_t i = write( sockd, ((byte*)buf) + n, count - n); - if(i == -1) { + if(i == -1) { // XXX not threadsafe! perror("writetosocket failed"); return errno; } else if(i == 0) { @@ -113,6 +147,43 @@ static inline bool opisresponse(network_op_t op) { return (LOGSTORE_FIRST_RESPONSE_CODE <= op && op <= LOGSTORE_LAST_RESPONSE_CODE); } +static inline network_op_t freadopfromsocket(FILE * sockf, logstore_opcode_type type) { + network_op_t ret; + ssize_t n = fread(&ret, sizeof(network_op_t), 1, sockf); + if(n == sizeof(network_op_t)) { + // done. + } else if(n == 0) { // EOF + perror("Socket closed mid request."); + return LOGSTORE_CONN_CLOSED_ERROR; + } else { + assert(n == -1); // sizeof(network_op_t) is 1, so short reads are impossible. + perror("Could not read opcode from socket"); + return LOGSTORE_SOCKET_ERROR; + } + // sanity checking + switch(type) { + case LOGSTORE_CLIENT_REQUEST: { + if(!(opisrequest(ret) || opiserror(ret))) { + fprintf(stderr, "Read invalid request code %d\n", (int)ret); + if(opisresponse(ret)) { + fprintf(stderr, "(also, the request code is a valid response code)\n"); + } + ret = LOGSTORE_PROTOCOL_ERROR; + } + } break; + case LOGSTORE_SERVER_RESPONSE: { + if(!(opisresponse(ret) || opiserror(ret))) { + fprintf(stderr, "Read invalid response code %d\n", (int)ret); + if(opisrequest(ret)) { + fprintf(stderr, "(also, the response code is a valid request code)\n"); + } + ret = LOGSTORE_PROTOCOL_ERROR; + } + + } + } + return ret; +} static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type) { network_op_t ret; ssize_t n = read(sockd, &ret, sizeof(network_op_t)); @@ -150,6 +221,10 @@ static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type } return ret; } +static inline int fwriteoptosocket(FILE * sockf, network_op_t op) { + assert(opiserror(op) || opisrequest(op) || opisresponse(op)); + return fwritetosocket(sockf, &op, sizeof(network_op_t)); +} static inline int writeoptosocket(int sockd, network_op_t op) { assert(opiserror(op) || opisrequest(op) || opisresponse(op)); return writetosocket(sockd, &op, sizeof(network_op_t)); @@ -166,6 +241,22 @@ static inline int writeoptosocket(int sockd, network_op_t op) { */ +static inline datatuple* freadtuplefromsocket(FILE * sockf, int * err) { + + len_t keylen, datalen, buflen; + + if(( *err = freadfromsocket(sockf, &keylen, sizeof(keylen)) )) return NULL; + if(keylen == DELETE) return NULL; // *err is zero. + if(( *err = freadfromsocket(sockf, &datalen, sizeof(datalen)) )) return NULL; + + buflen = datatuple::length_from_header(keylen, datalen); + byte* bytes = (byte*) malloc(buflen); + + if(( *err = freadfromsocket(sockf, bytes, buflen) )) return NULL; + + return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer. +} + /** @param sockd The socket. @param error will be set to zero on succes, a logstore error number on failure @@ -187,9 +278,27 @@ static inline datatuple* readtuplefromsocket(int sockd, int * err) { return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer. } +static inline int fwriteendofiteratortosocket(FILE * sockf) { + return fwritetosocket(sockf, &DELETE, sizeof(DELETE)); +} + static inline int writeendofiteratortosocket(int sockd) { return writetosocket(sockd, &DELETE, sizeof(DELETE)); } +static inline int fwritetupletosocket(FILE * sockf, const datatuple *tup) { + len_t keylen, datalen; + int err; + + if(tup == NULL) { + if(( err = fwriteendofiteratortosocket(sockf) )) return err; + } else { + const byte* buf = tup->get_bytes(&keylen, &datalen); + if(( err = fwritetosocket(sockf, &keylen, sizeof(keylen)) )) return err; + if(( err = fwritetosocket(sockf, &datalen, sizeof(datalen)) )) return err; + if(( err = fwritetosocket(sockf, buf, datatuple::length_from_header(keylen, datalen)) )) return err; + } + return 0; +} static inline int writetupletosocket(int sockd, const datatuple* tup) { len_t keylen, datalen; int err; @@ -205,11 +314,19 @@ static inline int writetupletosocket(int sockd, const datatuple* tup) { return 0; } +static inline uint64_t freadcountfromsocket(FILE* sockf, int *err) { + uint64_t ret; + *err = freadfromsocket(sockf, &ret, sizeof(ret)); + return ret; +} static inline uint64_t readcountfromsocket(int sockd, int *err) { uint64_t ret; *err = readfromsocket(sockd, &ret, sizeof(ret)); return ret; } +static inline int fwritecounttosocket(FILE* sockf, uint64_t count) { + return fwritetosocket(sockf, &count, sizeof(count)); +} static inline int writecounttosocket(int sockd, uint64_t count) { return writetosocket(sockd, &count, sizeof(count)); } diff --git a/tcpclient.cpp b/tcpclient.cpp index 7b3483a..d6d1609 100644 --- a/tcpclient.cpp +++ b/tcpclient.cpp @@ -30,6 +30,7 @@ struct logstore_handle_t { struct sockaddr_in serveraddr; struct hostent* server; int server_socket; + FILE * server_fsocket; }; logstore_handle_t * logstore_client_open(const char *host, int portnum, int timeout) { @@ -38,7 +39,8 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time ret->portnum = portnum; if(ret->portnum == 0) { ret->portnum = 32432; } ret->timeout = timeout; - ret->server_socket = -1; + ret->server_socket = -1; + ret->server_fsocket = NULL; ret->server = gethostbyname(ret->host); if (ret->server == NULL) { @@ -59,9 +61,10 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time } static inline void close_conn(logstore_handle_t *l) { - perror("read/write err.. conn closed.\n"); - close(l->server_socket); //close the connection - l->server_socket = -1; + perror("read/write err.. conn closed.\n"); + fclose(l->server_fsocket); //close the connection + l->server_fsocket = NULL; + l->server_socket = -1; } uint8_t @@ -70,15 +73,14 @@ logstore_client_op_returns_many(logstore_handle_t *l, if(l->server_socket < 0) { - l->server_socket = socket(AF_INET, SOCK_STREAM, 0); - + l->server_socket = socket(AF_INET, SOCK_STREAM, 0); + l->server_fsocket = fdopen(l->server_socket, "a+"); if (l->server_socket < 0) { perror("ERROR opening socket.\n"); return 0; } - int flag = 1; int result = setsockopt(l->server_socket, /* socket affected */ IPPROTO_TCP, /* set option at TCP level */ @@ -92,7 +94,6 @@ logstore_client_op_returns_many(logstore_handle_t *l, return 0; } - /* connect: create a connection with the server */ if (connect(l->server_socket, (sockaddr*) &(l->serveraddr), sizeof(l->serveraddr)) < 0) { @@ -106,20 +107,22 @@ logstore_client_op_returns_many(logstore_handle_t *l, network_op_t err = 0; //send the opcode - if( !err) { err = writetosocket(l->server_socket, &opcode, sizeof(opcode)); } + if( !err) { err = fwritetosocket(l->server_fsocket, &opcode, sizeof(opcode)); } //send the first tuple - if( !err) { err = writetupletosocket(l->server_socket, tuple); } + if( !err) { err = fwritetupletosocket(l->server_fsocket, tuple); } //send the second tuple - if( !err) { err = writetupletosocket(l->server_socket, tuple2); } + if( !err) { err = fwritetupletosocket(l->server_fsocket, tuple2); } if( (!err) && (count != (uint64_t)-1) ) { - err = writecounttosocket(l->server_socket, count); } + err = fwritecounttosocket(l->server_fsocket, count); } + + fflush_unlocked(l->server_fsocket); network_op_t rcode = LOGSTORE_CONN_CLOSED_ERROR; if( !err) { - rcode = readopfromsocket(l->server_socket,LOGSTORE_SERVER_RESPONSE); + rcode = freadopfromsocket(l->server_fsocket,LOGSTORE_SERVER_RESPONSE); } if( opiserror(rcode) ) { close_conn(l); } @@ -129,9 +132,9 @@ logstore_client_op_returns_many(logstore_handle_t *l, } datatuple * logstore_client_next_tuple(logstore_handle_t *l) { - assert(l->server_socket != -1); // otherwise, then the client forgot to check a return value... + assert(l->server_fsocket != 0); // otherwise, then the client forgot to check a return value... int err = 0; - datatuple * ret = readtuplefromsocket(l->server_socket, &err); + datatuple * ret = freadtuplefromsocket(l->server_fsocket, &err); if(err) { close_conn(l); if(ret) { @@ -174,12 +177,12 @@ logstore_client_op(logstore_handle_t *l, } int logstore_client_close(logstore_handle_t* l) { - if(l->server_socket > 0) + if(l->server_fsocket) { - writetosocket(l->server_socket, (char*) &OP_DONE, sizeof(uint8_t)); + fwritetosocket(l->server_fsocket, (char*) &OP_DONE, sizeof(uint8_t)); - close(l->server_socket); - DEBUG("socket closed %d\n.", l->server_socket); + fclose(l->server_fsocket); + DEBUG("socket closed %d\n.", l->server_fsocket); } free(l->host); free(l);