From bded48a306b67ddc3ca821b4aa379ac1c4e11392 Mon Sep 17 00:00:00 2001 From: sears Date: Wed, 11 Aug 2010 22:46:55 +0000 Subject: [PATCH] add new server implementation that uses FILE to buffer I/O, and handles a smaller, fixed number of connections -- leads to an almost 4x speedup on no-ops on nehalem git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@986 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- CMakeLists.txt | 3 +- logserver.cpp | 2 +- network.h | 40 ++++++------ newserver.cpp | 76 +++++++++++++++++++++++ requestDispatch.cpp | 76 +++++++++++++++++++---- requestDispatch.h | 20 +++++- simpleServer.cpp | 146 ++++++++++++++++++++++++++++++++++++++++++++ simpleServer.h | 32 ++++++++++ tcpclient.cpp | 14 ++--- 9 files changed, 366 insertions(+), 43 deletions(-) create mode 100644 newserver.cpp create mode 100644 simpleServer.cpp create mode 100644 simpleServer.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 56a1d81..e2dc9bd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -76,7 +76,8 @@ ENDIF ( "${CMAKE_C_COMPILER_ID}" STREQUAL "GNU" ) #CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h) IF ( HAVE_STASIS ) - ADD_LIBRARY(logstore requestDispatch.cpp logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp mergeStats.cpp mergeManager.cpp) + ADD_LIBRARY(logstore requestDispatch.cpp simpleServer.cpp logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp mergeStats.cpp mergeManager.cpp) CREATE_EXECUTABLE(server) + CREATE_EXECUTABLE(newserver) ENDIF ( HAVE_STASIS ) ADD_LIBRARY(logstore_client tcpclient.cpp) diff --git a/logserver.cpp b/logserver.cpp index 8da0650..1c4c854 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -462,7 +462,7 @@ void * thread_work_fn( void * args) if(!err) { tuple2 = readtuplefromsocket(*(item->data->workitem), &err); } //step 3: process the tuple - if(!err) { err = dispatch_request(opcode, tuple, tuple2, item->data->ltable, *(item->data->workitem)); } + if(!err) { err = requestDispatch::dispatch_request(opcode, tuple, tuple2, item->data->ltable, *(item->data->workitem)); } //free the tuple if(tuple) datatuple::freetuple(tuple); diff --git a/network.h b/network.h index 004ea91..a3198aa 100644 --- a/network.h +++ b/network.h @@ -63,7 +63,7 @@ typedef enum { LOGSTORE_SERVER_RESPONSE } logstore_opcode_type; -static inline int freadfromsocket(FILE * sockf, void *buf, ssize_t count) { +static inline int readfromsocket(FILE * sockf, void *buf, ssize_t count) { ssize_t i = fread_unlocked(buf, sizeof(byte), count, sockf); if(i != count) { if(feof(sockf)) { @@ -101,7 +101,7 @@ static inline int readfromsocket(int sockd, void *buf, ssize_t count) } -static inline int fwritetosocket(FILE * sockf, const void *buf, ssize_t count) { +static inline int writetosocket(FILE * sockf, const void *buf, ssize_t count) { ssize_t i = fwrite_unlocked((byte*)buf, sizeof(byte), count, sockf); if(i != count) { if(feof(sockf)) { @@ -147,7 +147,7 @@ static inline bool opisresponse(network_op_t op) { return (LOGSTORE_FIRST_RESPONSE_CODE <= op && op <= LOGSTORE_LAST_RESPONSE_CODE); } -static inline network_op_t freadopfromsocket(FILE * sockf, logstore_opcode_type type) { +static inline network_op_t readopfromsocket(FILE * sockf, logstore_opcode_type type) { network_op_t ret; ssize_t n = fread(&ret, sizeof(network_op_t), 1, sockf); if(n == sizeof(network_op_t)) { @@ -221,9 +221,9 @@ static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type } return ret; } -static inline int fwriteoptosocket(FILE * sockf, network_op_t op) { +static inline int writeoptosocket(FILE * sockf, network_op_t op) { assert(opiserror(op) || opisrequest(op) || opisresponse(op)); - return fwritetosocket(sockf, &op, sizeof(network_op_t)); + return writetosocket(sockf, &op, sizeof(network_op_t)); } static inline int writeoptosocket(int sockd, network_op_t op) { assert(opiserror(op) || opisrequest(op) || opisresponse(op)); @@ -241,18 +241,18 @@ static inline int writeoptosocket(int sockd, network_op_t op) { */ -static inline datatuple* freadtuplefromsocket(FILE * sockf, int * err) { +static inline datatuple* readtuplefromsocket(FILE * sockf, int * err) { len_t keylen, datalen, buflen; - if(( *err = freadfromsocket(sockf, &keylen, sizeof(keylen)) )) return NULL; + if(( *err = readfromsocket(sockf, &keylen, sizeof(keylen)) )) return NULL; if(keylen == DELETE) return NULL; // *err is zero. - if(( *err = freadfromsocket(sockf, &datalen, sizeof(datalen)) )) return NULL; + if(( *err = readfromsocket(sockf, &datalen, sizeof(datalen)) )) return NULL; buflen = datatuple::length_from_header(keylen, datalen); byte* bytes = (byte*) malloc(buflen); - if(( *err = freadfromsocket(sockf, bytes, buflen) )) return NULL; + if(( *err = readfromsocket(sockf, bytes, buflen) )) return NULL; return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer. } @@ -278,24 +278,24 @@ static inline datatuple* readtuplefromsocket(int sockd, int * err) { return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer. } -static inline int fwriteendofiteratortosocket(FILE * sockf) { - return fwritetosocket(sockf, &DELETE, sizeof(DELETE)); +static inline int writeendofiteratortosocket(FILE * sockf) { + return writetosocket(sockf, &DELETE, sizeof(DELETE)); } static inline int writeendofiteratortosocket(int sockd) { return writetosocket(sockd, &DELETE, sizeof(DELETE)); } -static inline int fwritetupletosocket(FILE * sockf, const datatuple *tup) { +static inline int writetupletosocket(FILE * sockf, const datatuple *tup) { len_t keylen, datalen; int err; if(tup == NULL) { - if(( err = fwriteendofiteratortosocket(sockf) )) return err; + if(( err = writeendofiteratortosocket(sockf) )) return err; } else { const byte* buf = tup->get_bytes(&keylen, &datalen); - if(( err = fwritetosocket(sockf, &keylen, sizeof(keylen)) )) return err; - if(( err = fwritetosocket(sockf, &datalen, sizeof(datalen)) )) return err; - if(( err = fwritetosocket(sockf, buf, datatuple::length_from_header(keylen, datalen)) )) return err; + if(( err = writetosocket(sockf, &keylen, sizeof(keylen)) )) return err; + if(( err = writetosocket(sockf, &datalen, sizeof(datalen)) )) return err; + if(( err = writetosocket(sockf, buf, datatuple::length_from_header(keylen, datalen)) )) return err; } return 0; } @@ -314,9 +314,9 @@ static inline int writetupletosocket(int sockd, const datatuple* tup) { return 0; } -static inline uint64_t freadcountfromsocket(FILE* sockf, int *err) { +static inline uint64_t readcountfromsocket(FILE* sockf, int *err) { uint64_t ret; - *err = freadfromsocket(sockf, &ret, sizeof(ret)); + *err = readfromsocket(sockf, &ret, sizeof(ret)); return ret; } static inline uint64_t readcountfromsocket(int sockd, int *err) { @@ -324,8 +324,8 @@ static inline uint64_t readcountfromsocket(int sockd, int *err) { *err = readfromsocket(sockd, &ret, sizeof(ret)); return ret; } -static inline int fwritecounttosocket(FILE* sockf, uint64_t count) { - return fwritetosocket(sockf, &count, sizeof(count)); +static inline int writecounttosocket(FILE* sockf, uint64_t count) { + return writetosocket(sockf, &count, sizeof(count)); } static inline int writecounttosocket(int sockd, uint64_t count) { return writetosocket(sockd, &count, sizeof(count)); diff --git a/newserver.cpp b/newserver.cpp new file mode 100644 index 0000000..c57978a --- /dev/null +++ b/newserver.cpp @@ -0,0 +1,76 @@ +#include +#undef end +#undef try +#undef begin + +#include "merger.h" +#include "logstore.h" +#include "simpleServer.h" + +/* + * newserver.cpp + * + * Created on: Aug 11, 2010 + * Author: sears + */ + +int main(int argc, char *argv[]) +{ + + logtable::init_stasis(); + + int xid = Tbegin(); + + merge_scheduler * mscheduler = new merge_scheduler; + + logtable ltable; + + recordid table_root = ROOT_RECORD; + + int64_t c0_size = 1024 * 1024 * 512 * 1; + + if(argc == 2 && !strcmp(argv[1], "--test")) { + c0_size = 1024 * 1024 * 100; + printf("warning: running w/ tiny c0 for testing\n"); // XXX build a separate test server and deployment server? + } + + if(argc == 2 && !strcmp(argv[1], "--benchmark")) { + c0_size = 1024 * 1024 * 1024 * 1; + printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server? + } + + if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { + printf("Creating empty logstore\n"); + table_root = ltable.allocTable(xid); + assert(table_root.page == ROOT_RECORD.page && + table_root.slot == ROOT_RECORD.slot); + } else { + printf("Opened existing logstore\n"); + table_root.size = TrecordSize(xid, ROOT_RECORD); + ltable.openTable(xid, table_root); + } + + Tcommit(xid); + + int lindex = mscheduler->addlogtable(<able); + ltable.setMergeData(mscheduler->getMergeData(lindex)); + + mscheduler->startlogtable(lindex, c0_size); + + simpleServer *lserver = new simpleServer(<able); + + lserver->acceptLoop(); + + printf ("Stopping server...\n"); + delete lserver; + + printf("Stopping merge threads...\n"); + mscheduler->shutdown(); + delete mscheduler; + + printf("Deinitializing stasis...\n"); + fflush(stdout); + logtable::deinit_stasis(); + + printf("Shutdown complete\n"); +} diff --git a/requestDispatch.cpp b/requestDispatch.cpp index b0900f3..2f6a89b 100644 --- a/requestDispatch.cpp +++ b/requestDispatch.cpp @@ -7,13 +7,15 @@ #include "requestDispatch.h" #include "regionAllocator.h" -static inline int op_insert(logtable * ltable, int fd, datatuple * tuple) { +template +inline int requestDispatch::op_insert(logtable * ltable, HANDLE fd, datatuple * tuple) { //insert/update/delete ltable->insertTuple(tuple); //step 4: send response return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } -static inline int op_find(logtable * ltable, int fd, datatuple * tuple) { +template +inline int requestDispatch::op_find(logtable * ltable, HANDLE fd, datatuple * tuple) { //find the tuple datatuple *dt = ltable->findTuple_first(-1, tuple->key(), tuple->keylen()); @@ -57,7 +59,8 @@ static inline int op_find(logtable * ltable, int fd, datatuple * tupl } return err; } -static inline int op_scan(logtable * ltable, int fd, datatuple * tuple, datatuple * tuple2, size_t limit) { +template +inline int requestDispatch::op_scan(logtable * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2, size_t limit) { size_t count = 0; int err = writeoptosocket(fd, LOGSTORE_RESPONSE_SENDING_TUPLES); @@ -81,15 +84,18 @@ static inline int op_scan(logtable * ltable, int fd, datatuple * tupl if(!err) { writeendofiteratortosocket(fd); } return err; } -static inline int op_flush(logtable * ltable, int fd) { +template +inline int requestDispatch::op_flush(logtable * ltable, HANDLE fd) { ltable->flushTable(); return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } -static inline int op_shutdown(logtable * ltable, int fd) { +template +inline int requestDispatch::op_shutdown(logtable * ltable, HANDLE fd) { // XXX return writeoptosocket(fd, LOGSTORE_UNIMPLEMENTED_ERROR); } -static inline int op_stat_space_usage(logtable * ltable, int fd) { +template +inline int requestDispatch::op_stat_space_usage(logtable * ltable, HANDLE fd) { int xid = Tbegin(); @@ -186,12 +192,14 @@ static inline int op_stat_space_usage(logtable * ltable, int fd) { return err; } -static inline int op_stat_perf_report(logtable * ltable, int fd) { +template +inline int requestDispatch::op_stat_perf_report(logtable * ltable, HANDLE fd) { } -static inline int op_stat_histogram(logtable * ltable, int fd, size_t limit) { +template +inline int requestDispatch::op_stat_histogram(logtable * ltable, HANDLE fd, size_t limit) { if(limit < 3) { return writeoptosocket(fd, LOGSTORE_PROTOCOL_ERROR); @@ -248,7 +256,8 @@ static inline int op_stat_histogram(logtable * ltable, int fd, size_t Tcommit(xid); return err; } -static inline int op_dbg_blockmap(logtable * ltable, int fd) { +template +inline int requestDispatch::op_dbg_blockmap(logtable * ltable, HANDLE fd) { // produce a list of stasis regions int xid = Tbegin(); @@ -351,7 +360,8 @@ static inline int op_dbg_blockmap(logtable * ltable, int fd) { return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } -static inline int op_dbg_drop_database(logtable * ltable, int fd) { +template +inline int requestDispatch::op_dbg_drop_database(logtable * ltable, HANDLE fd) { logtable::iterator * itr = new logtable::iterator(ltable); datatuple * del; fprintf(stderr, "DROPPING DATABASE...\n"); @@ -376,11 +386,50 @@ static inline int op_dbg_drop_database(logtable * ltable, int fd) { fprintf(stderr, "...DROP DATABASE COMPLETE\n"); return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } -static inline int op_dbg_noop(logtable * ltable, int fd) { +template +inline int requestDispatch::op_dbg_noop(logtable * ltable, HANDLE fd) { return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } -int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable * ltable, int fd) { +template +int requestDispatch::dispatch_request(HANDLE f, logtable*ltable) { + //step 1: read the opcode + network_op_t opcode = readopfromsocket(f, LOGSTORE_CLIENT_REQUEST); + if(opcode == LOGSTORE_CONN_CLOSED_ERROR) { + opcode = OP_DONE; + printf("Broken client closed connection uncleanly\n"); + } + + int err = opcode == OP_DONE || opiserror(opcode); //close the conn on failure + + //step 2: read the first tuple from client + datatuple *tuple = 0, *tuple2 = 0; + if(!err) { tuple = readtuplefromsocket(f, &err); } + // read the second tuple from client + if(!err) { tuple2 = readtuplefromsocket(f, &err); } + + //step 3: process the tuple + if(!err) { err = dispatch_request(opcode, tuple, tuple2, ltable, f); } + + //free the tuple + if(tuple) datatuple::freetuple(tuple); + if(tuple2) datatuple::freetuple(tuple2); + + // Deal with old work_queue item by freeing it or putting it back in the queue. + + if(err) { + if(opcode != OP_DONE) { + perror("network error. conn closed"); + } else { +// printf("client done. conn closed. (%d, %d)\n", +// *(item->data->workitem), item->data->work_queue->size()); + } + } + return err; + +} +template +int requestDispatch::dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable * ltable, HANDLE fd) { int err = 0; if(opcode == OP_INSERT) { @@ -429,3 +478,6 @@ int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, } return err; } + +template class requestDispatch; +template class requestDispatch; diff --git a/requestDispatch.h b/requestDispatch.h index 0f59c9f..c30c9db 100644 --- a/requestDispatch.h +++ b/requestDispatch.h @@ -10,7 +10,23 @@ #include "network.h" #include "datatuple.h" #include "logstore.h" +template +class requestDispatch { +private: + static inline int op_insert(logtable * ltable, HANDLE fd, datatuple * tuple); + static inline int op_find(logtable * ltable, HANDLE fd, datatuple * tuple); + static inline int op_scan(logtable * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2, size_t limit); + static inline int op_flush(logtable * ltable, HANDLE fd); + static inline int op_shutdown(logtable * ltable, HANDLE fd); + static inline int op_stat_space_usage(logtable * ltable, HANDLE fd); + static inline int op_stat_perf_report(logtable * ltable, HANDLE fd); + static inline int op_stat_histogram(logtable * ltable, HANDLE fd, size_t limit); + static inline int op_dbg_blockmap(logtable * ltable, HANDLE fd); + static inline int op_dbg_drop_database(logtable * ltable, HANDLE fd); + static inline int op_dbg_noop(logtable * ltable, HANDLE fd); -int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable * logstore, int fd); - +public: + static int dispatch_request(HANDLE f, logtable * ltable); + static int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable * ltable, HANDLE fd); +}; #endif /* REQUESTDISPATCH_H_ */ diff --git a/simpleServer.cpp b/simpleServer.cpp new file mode 100644 index 0000000..bf10949 --- /dev/null +++ b/simpleServer.cpp @@ -0,0 +1,146 @@ +/* + * simpleServer.cpp + * + * Created on: Aug 11, 2010 + * Author: sears + */ + +#include "simpleServer.h" +#include "requestDispatch.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +typedef struct { + simpleServer * obj; + int self; +} worker_arg; + +void * worker_wrap(void * arg) { + ((worker_arg*)arg)->obj->worker(((worker_arg*)arg)->self); + free(arg); + return 0; +} + +void * simpleServer::worker(int self) { + while(true) { + pthread_mutex_lock(&thread_mut[self]); + thread_fd[self] = -1; + while(thread_fd[self] == -1) { + if(!running) { return 0; } + pthread_cond_wait(&thread_cond[self], &thread_mut[self]); + } + pthread_mutex_unlock(&thread_mut[self]); + FILE * f = fdopen(thread_fd[self], "a+"); + while(!requestDispatch::dispatch_request(f, ltable)) { } + fclose(f); + } +} + +simpleServer::simpleServer(logtable * ltable, int max_threads, int port): + ltable(ltable), + port(port), + max_threads(max_threads), + thread_fd((int*)malloc(sizeof(*thread_fd)*max_threads)), + thread_cond((pthread_cond_t*)malloc(sizeof(*thread_cond)*max_threads)), + thread_mut((pthread_mutex_t*)malloc(sizeof(*thread_mut)*max_threads)), + thread((pthread_t*)malloc(sizeof(*thread)*max_threads)), + running(true) { + for(int i = 0; i < max_threads; i++) { + thread_fd[i] = -1; + pthread_cond_init(&thread_cond[i], 0); + pthread_mutex_init(&thread_mut[i], 0); + worker_arg * arg = (worker_arg*)malloc(sizeof(worker_arg)); + arg->obj = this; + arg->self = i; + pthread_create(&thread[i], 0, worker_wrap, (void*)arg); + } +} + +bool simpleServer::acceptLoop() { + + int sockfd; + struct sockaddr_in serv_addr; + struct sockaddr_in cli_addr; + int newsockfd; + + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if(sockfd == -1) { + perror("ERROR opening socket"); + return false; + } + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // XXX security... + serv_addr.sin_port = htons(port); + + if(bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) { + perror("ERROR on binding"); + return false; + } + if(listen(sockfd,SOMAXCONN)==-1) { + perror("ERROR on listen"); + return false; + } + printf("LSM Server listening....\n"); + +// *(sdata->server_socket) = sockfd; + int flag, result; + + while(true) { + socklen_t clilen = sizeof(cli_addr); + + newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen); + if(newsockfd == -1) { + perror("ERROR on accept"); + } else { + flag = 1; + result = setsockopt(newsockfd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(int)); + if(result == -1) { + perror("ERROR on setting socket option TCP_NODELAY"); + // ignore the error. + } + // char clientip[20]; + // inet_ntop(AF_INET, (void*) &(cli_addr.sin_addr), clientip, 20); + // printf("Connection from %s\n", clientip); + int i; + for(i = 0; i < max_threads; i++) { //TODO round robin or something? (currently, we don't care if connect() is slow...) + pthread_mutex_lock(&thread_mut[i]); + if(thread_fd[i] == -1) { + thread_fd[i] = newsockfd; + DEBUG("connect %d\n", i); + pthread_cond_signal(&thread_cond[i]); + pthread_mutex_unlock(&thread_mut[i]); + break; + } else { + pthread_mutex_unlock(&thread_mut[i]); + } + } + if(i == max_threads) { + printf("all threads are busy. rejecting connection\n"); + close(newsockfd); + } + } + } +} +simpleServer::~simpleServer() { + running = false; + for(int i = 0; i < max_threads; i++) { + pthread_cond_signal(&thread_cond[i]); + pthread_join(thread[i],0); + pthread_mutex_destroy(&thread_mut[i]); + pthread_cond_destroy(&thread_cond[i]); + } + free(thread); + free(thread_mut); + free(thread_cond); + free(thread_fd); +} diff --git a/simpleServer.h b/simpleServer.h new file mode 100644 index 0000000..d48504d --- /dev/null +++ b/simpleServer.h @@ -0,0 +1,32 @@ +/* + * simpleServer.h + * + * Created on: Aug 11, 2010 + * Author: sears + */ + +#ifndef SIMPLESERVER_H_ +#define SIMPLESERVER_H_ +#include "logstore.h" + +class simpleServer { +public: + void* worker(int /*handle*/); + static const int DEFAULT_PORT = 32432; + static const int DEFAULT_THREADS = 1000; + + simpleServer(logtable * ltable, int max_threads = DEFAULT_THREADS, int port = DEFAULT_PORT); + bool acceptLoop(); + ~simpleServer(); +private: + logtable* ltable; + int port; + int max_threads; + int * thread_fd; + pthread_cond_t * thread_cond; + pthread_mutex_t * thread_mut; + pthread_t * thread; + bool running; +}; + +#endif /* SIMPLESERVER_H_ */ diff --git a/tcpclient.cpp b/tcpclient.cpp index d6d1609..7151260 100644 --- a/tcpclient.cpp +++ b/tcpclient.cpp @@ -107,22 +107,22 @@ logstore_client_op_returns_many(logstore_handle_t *l, network_op_t err = 0; //send the opcode - if( !err) { err = fwritetosocket(l->server_fsocket, &opcode, sizeof(opcode)); } + if( !err) { err = writetosocket(l->server_fsocket, &opcode, sizeof(opcode)); } //send the first tuple - if( !err) { err = fwritetupletosocket(l->server_fsocket, tuple); } + if( !err) { err = writetupletosocket(l->server_fsocket, tuple); } //send the second tuple - if( !err) { err = fwritetupletosocket(l->server_fsocket, tuple2); } + if( !err) { err = writetupletosocket(l->server_fsocket, tuple2); } if( (!err) && (count != (uint64_t)-1) ) { - err = fwritecounttosocket(l->server_fsocket, count); } + err = writecounttosocket(l->server_fsocket, count); } fflush_unlocked(l->server_fsocket); network_op_t rcode = LOGSTORE_CONN_CLOSED_ERROR; if( !err) { - rcode = freadopfromsocket(l->server_fsocket,LOGSTORE_SERVER_RESPONSE); + rcode = readopfromsocket(l->server_fsocket,LOGSTORE_SERVER_RESPONSE); } if( opiserror(rcode) ) { close_conn(l); } @@ -134,7 +134,7 @@ datatuple * logstore_client_next_tuple(logstore_handle_t *l) { assert(l->server_fsocket != 0); // otherwise, then the client forgot to check a return value... int err = 0; - datatuple * ret = freadtuplefromsocket(l->server_fsocket, &err); + datatuple * ret = readtuplefromsocket(l->server_fsocket, &err); if(err) { close_conn(l); if(ret) { @@ -179,7 +179,7 @@ logstore_client_op(logstore_handle_t *l, int logstore_client_close(logstore_handle_t* l) { if(l->server_fsocket) { - fwritetosocket(l->server_fsocket, (char*) &OP_DONE, sizeof(uint8_t)); + writetosocket(l->server_fsocket, (char*) &OP_DONE, sizeof(uint8_t)); fclose(l->server_fsocket); DEBUG("socket closed %d\n.", l->server_fsocket);