add new server implementation that uses FILE to buffer I/O, and handles a smaller, fixed number of connections -- leads to an almost 4x speedup on no-ops on nehalem

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@986 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-08-11 22:46:55 +00:00
parent 569b46363b
commit bded48a306
9 changed files with 366 additions and 43 deletions

View file

@ -76,7 +76,8 @@ ENDIF ( "${CMAKE_C_COMPILER_ID}" STREQUAL "GNU" )
#CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h)
IF ( HAVE_STASIS )
ADD_LIBRARY(logstore requestDispatch.cpp logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp mergeStats.cpp mergeManager.cpp)
ADD_LIBRARY(logstore requestDispatch.cpp simpleServer.cpp logserver.cpp logstore.cpp diskTreeComponent.cpp memTreeComponent.cpp datapage.cpp merger.cpp tuplemerger.cpp mergeStats.cpp mergeManager.cpp)
CREATE_EXECUTABLE(server)
CREATE_EXECUTABLE(newserver)
ENDIF ( HAVE_STASIS )
ADD_LIBRARY(logstore_client tcpclient.cpp)

View file

@ -462,7 +462,7 @@ void * thread_work_fn( void * args)
if(!err) { tuple2 = readtuplefromsocket(*(item->data->workitem), &err); }
//step 3: process the tuple
if(!err) { err = dispatch_request(opcode, tuple, tuple2, item->data->ltable, *(item->data->workitem)); }
if(!err) { err = requestDispatch<int>::dispatch_request(opcode, tuple, tuple2, item->data->ltable, *(item->data->workitem)); }
//free the tuple
if(tuple) datatuple::freetuple(tuple);

View file

@ -63,7 +63,7 @@ typedef enum {
LOGSTORE_SERVER_RESPONSE
} logstore_opcode_type;
static inline int freadfromsocket(FILE * sockf, void *buf, ssize_t count) {
static inline int readfromsocket(FILE * sockf, void *buf, ssize_t count) {
ssize_t i = fread_unlocked(buf, sizeof(byte), count, sockf);
if(i != count) {
if(feof(sockf)) {
@ -101,7 +101,7 @@ static inline int readfromsocket(int sockd, void *buf, ssize_t count)
}
static inline int fwritetosocket(FILE * sockf, const void *buf, ssize_t count) {
static inline int writetosocket(FILE * sockf, const void *buf, ssize_t count) {
ssize_t i = fwrite_unlocked((byte*)buf, sizeof(byte), count, sockf);
if(i != count) {
if(feof(sockf)) {
@ -147,7 +147,7 @@ static inline bool opisresponse(network_op_t op) {
return (LOGSTORE_FIRST_RESPONSE_CODE <= op && op <= LOGSTORE_LAST_RESPONSE_CODE);
}
static inline network_op_t freadopfromsocket(FILE * sockf, logstore_opcode_type type) {
static inline network_op_t readopfromsocket(FILE * sockf, logstore_opcode_type type) {
network_op_t ret;
ssize_t n = fread(&ret, sizeof(network_op_t), 1, sockf);
if(n == sizeof(network_op_t)) {
@ -221,9 +221,9 @@ static inline network_op_t readopfromsocket(int sockd, logstore_opcode_type type
}
return ret;
}
static inline int fwriteoptosocket(FILE * sockf, network_op_t op) {
static inline int writeoptosocket(FILE * sockf, network_op_t op) {
assert(opiserror(op) || opisrequest(op) || opisresponse(op));
return fwritetosocket(sockf, &op, sizeof(network_op_t));
return writetosocket(sockf, &op, sizeof(network_op_t));
}
static inline int writeoptosocket(int sockd, network_op_t op) {
assert(opiserror(op) || opisrequest(op) || opisresponse(op));
@ -241,18 +241,18 @@ static inline int writeoptosocket(int sockd, network_op_t op) {
*/
static inline datatuple* freadtuplefromsocket(FILE * sockf, int * err) {
static inline datatuple* readtuplefromsocket(FILE * sockf, int * err) {
len_t keylen, datalen, buflen;
if(( *err = freadfromsocket(sockf, &keylen, sizeof(keylen)) )) return NULL;
if(( *err = readfromsocket(sockf, &keylen, sizeof(keylen)) )) return NULL;
if(keylen == DELETE) return NULL; // *err is zero.
if(( *err = freadfromsocket(sockf, &datalen, sizeof(datalen)) )) return NULL;
if(( *err = readfromsocket(sockf, &datalen, sizeof(datalen)) )) return NULL;
buflen = datatuple::length_from_header(keylen, datalen);
byte* bytes = (byte*) malloc(buflen);
if(( *err = freadfromsocket(sockf, bytes, buflen) )) return NULL;
if(( *err = readfromsocket(sockf, bytes, buflen) )) return NULL;
return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer.
}
@ -278,24 +278,24 @@ static inline datatuple* readtuplefromsocket(int sockd, int * err) {
return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer.
}
static inline int fwriteendofiteratortosocket(FILE * sockf) {
return fwritetosocket(sockf, &DELETE, sizeof(DELETE));
static inline int writeendofiteratortosocket(FILE * sockf) {
return writetosocket(sockf, &DELETE, sizeof(DELETE));
}
static inline int writeendofiteratortosocket(int sockd) {
return writetosocket(sockd, &DELETE, sizeof(DELETE));
}
static inline int fwritetupletosocket(FILE * sockf, const datatuple *tup) {
static inline int writetupletosocket(FILE * sockf, const datatuple *tup) {
len_t keylen, datalen;
int err;
if(tup == NULL) {
if(( err = fwriteendofiteratortosocket(sockf) )) return err;
if(( err = writeendofiteratortosocket(sockf) )) return err;
} else {
const byte* buf = tup->get_bytes(&keylen, &datalen);
if(( err = fwritetosocket(sockf, &keylen, sizeof(keylen)) )) return err;
if(( err = fwritetosocket(sockf, &datalen, sizeof(datalen)) )) return err;
if(( err = fwritetosocket(sockf, buf, datatuple::length_from_header(keylen, datalen)) )) return err;
if(( err = writetosocket(sockf, &keylen, sizeof(keylen)) )) return err;
if(( err = writetosocket(sockf, &datalen, sizeof(datalen)) )) return err;
if(( err = writetosocket(sockf, buf, datatuple::length_from_header(keylen, datalen)) )) return err;
}
return 0;
}
@ -314,9 +314,9 @@ static inline int writetupletosocket(int sockd, const datatuple* tup) {
return 0;
}
static inline uint64_t freadcountfromsocket(FILE* sockf, int *err) {
static inline uint64_t readcountfromsocket(FILE* sockf, int *err) {
uint64_t ret;
*err = freadfromsocket(sockf, &ret, sizeof(ret));
*err = readfromsocket(sockf, &ret, sizeof(ret));
return ret;
}
static inline uint64_t readcountfromsocket(int sockd, int *err) {
@ -324,8 +324,8 @@ static inline uint64_t readcountfromsocket(int sockd, int *err) {
*err = readfromsocket(sockd, &ret, sizeof(ret));
return ret;
}
static inline int fwritecounttosocket(FILE* sockf, uint64_t count) {
return fwritetosocket(sockf, &count, sizeof(count));
static inline int writecounttosocket(FILE* sockf, uint64_t count) {
return writetosocket(sockf, &count, sizeof(count));
}
static inline int writecounttosocket(int sockd, uint64_t count) {
return writetosocket(sockd, &count, sizeof(count));

76
newserver.cpp Normal file
View file

@ -0,0 +1,76 @@
#include <stasis/transactional.h>
#undef end
#undef try
#undef begin
#include "merger.h"
#include "logstore.h"
#include "simpleServer.h"
/*
* newserver.cpp
*
* Created on: Aug 11, 2010
* Author: sears
*/
int main(int argc, char *argv[])
{
logtable<datatuple>::init_stasis();
int xid = Tbegin();
merge_scheduler * mscheduler = new merge_scheduler;
logtable<datatuple> ltable;
recordid table_root = ROOT_RECORD;
int64_t c0_size = 1024 * 1024 * 512 * 1;
if(argc == 2 && !strcmp(argv[1], "--test")) {
c0_size = 1024 * 1024 * 100;
printf("warning: running w/ tiny c0 for testing\n"); // XXX build a separate test server and deployment server?
}
if(argc == 2 && !strcmp(argv[1], "--benchmark")) {
c0_size = 1024 * 1024 * 1024 * 1;
printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server?
}
if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) {
printf("Creating empty logstore\n");
table_root = ltable.allocTable(xid);
assert(table_root.page == ROOT_RECORD.page &&
table_root.slot == ROOT_RECORD.slot);
} else {
printf("Opened existing logstore\n");
table_root.size = TrecordSize(xid, ROOT_RECORD);
ltable.openTable(xid, table_root);
}
Tcommit(xid);
int lindex = mscheduler->addlogtable(&ltable);
ltable.setMergeData(mscheduler->getMergeData(lindex));
mscheduler->startlogtable(lindex, c0_size);
simpleServer *lserver = new simpleServer(&ltable);
lserver->acceptLoop();
printf ("Stopping server...\n");
delete lserver;
printf("Stopping merge threads...\n");
mscheduler->shutdown();
delete mscheduler;
printf("Deinitializing stasis...\n");
fflush(stdout);
logtable<datatuple>::deinit_stasis();
printf("Shutdown complete\n");
}

View file

@ -7,13 +7,15 @@
#include "requestDispatch.h"
#include "regionAllocator.h"
static inline int op_insert(logtable<datatuple> * ltable, int fd, datatuple * tuple) {
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_insert(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple) {
//insert/update/delete
ltable->insertTuple(tuple);
//step 4: send response
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
}
static inline int op_find(logtable<datatuple> * ltable, int fd, datatuple * tuple) {
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_find(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple) {
//find the tuple
datatuple *dt = ltable->findTuple_first(-1, tuple->key(), tuple->keylen());
@ -57,7 +59,8 @@ static inline int op_find(logtable<datatuple> * ltable, int fd, datatuple * tupl
}
return err;
}
static inline int op_scan(logtable<datatuple> * ltable, int fd, datatuple * tuple, datatuple * tuple2, size_t limit) {
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_scan(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2, size_t limit) {
size_t count = 0;
int err = writeoptosocket(fd, LOGSTORE_RESPONSE_SENDING_TUPLES);
@ -81,15 +84,18 @@ static inline int op_scan(logtable<datatuple> * ltable, int fd, datatuple * tupl
if(!err) { writeendofiteratortosocket(fd); }
return err;
}
static inline int op_flush(logtable<datatuple> * ltable, int fd) {
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_flush(logtable<datatuple> * ltable, HANDLE fd) {
ltable->flushTable();
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
}
static inline int op_shutdown(logtable<datatuple> * ltable, int fd) {
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_shutdown(logtable<datatuple> * ltable, HANDLE fd) {
// XXX
return writeoptosocket(fd, LOGSTORE_UNIMPLEMENTED_ERROR);
}
static inline int op_stat_space_usage(logtable<datatuple> * ltable, int fd) {
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_stat_space_usage(logtable<datatuple> * ltable, HANDLE fd) {
int xid = Tbegin();
@ -186,12 +192,14 @@ static inline int op_stat_space_usage(logtable<datatuple> * ltable, int fd) {
return err;
}
static inline int op_stat_perf_report(logtable<datatuple> * ltable, int fd) {
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_stat_perf_report(logtable<datatuple> * ltable, HANDLE fd) {
}
static inline int op_stat_histogram(logtable<datatuple> * ltable, int fd, size_t limit) {
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_stat_histogram(logtable<datatuple> * ltable, HANDLE fd, size_t limit) {
if(limit < 3) {
return writeoptosocket(fd, LOGSTORE_PROTOCOL_ERROR);
@ -248,7 +256,8 @@ static inline int op_stat_histogram(logtable<datatuple> * ltable, int fd, size_t
Tcommit(xid);
return err;
}
static inline int op_dbg_blockmap(logtable<datatuple> * ltable, int fd) {
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_dbg_blockmap(logtable<datatuple> * ltable, HANDLE fd) {
// produce a list of stasis regions
int xid = Tbegin();
@ -351,7 +360,8 @@ static inline int op_dbg_blockmap(logtable<datatuple> * ltable, int fd) {
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
}
static inline int op_dbg_drop_database(logtable<datatuple> * ltable, int fd) {
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_dbg_drop_database(logtable<datatuple> * ltable, HANDLE fd) {
logtable<datatuple>::iterator * itr = new logtable<datatuple>::iterator(ltable);
datatuple * del;
fprintf(stderr, "DROPPING DATABASE...\n");
@ -376,11 +386,50 @@ static inline int op_dbg_drop_database(logtable<datatuple> * ltable, int fd) {
fprintf(stderr, "...DROP DATABASE COMPLETE\n");
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
}
static inline int op_dbg_noop(logtable<datatuple> * ltable, int fd) {
template<class HANDLE>
inline int requestDispatch<HANDLE>::op_dbg_noop(logtable<datatuple> * ltable, HANDLE fd) {
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
}
int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable<datatuple> * ltable, int fd) {
template<class HANDLE>
int requestDispatch<HANDLE>::dispatch_request(HANDLE f, logtable<datatuple>*ltable) {
//step 1: read the opcode
network_op_t opcode = readopfromsocket(f, LOGSTORE_CLIENT_REQUEST);
if(opcode == LOGSTORE_CONN_CLOSED_ERROR) {
opcode = OP_DONE;
printf("Broken client closed connection uncleanly\n");
}
int err = opcode == OP_DONE || opiserror(opcode); //close the conn on failure
//step 2: read the first tuple from client
datatuple *tuple = 0, *tuple2 = 0;
if(!err) { tuple = readtuplefromsocket(f, &err); }
// read the second tuple from client
if(!err) { tuple2 = readtuplefromsocket(f, &err); }
//step 3: process the tuple
if(!err) { err = dispatch_request(opcode, tuple, tuple2, ltable, f); }
//free the tuple
if(tuple) datatuple::freetuple(tuple);
if(tuple2) datatuple::freetuple(tuple2);
// Deal with old work_queue item by freeing it or putting it back in the queue.
if(err) {
if(opcode != OP_DONE) {
perror("network error. conn closed");
} else {
// printf("client done. conn closed. (%d, %d)\n",
// *(item->data->workitem), item->data->work_queue->size());
}
}
return err;
}
template<class HANDLE>
int requestDispatch<HANDLE>::dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable<datatuple> * ltable, HANDLE fd) {
int err = 0;
if(opcode == OP_INSERT)
{
@ -429,3 +478,6 @@ int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2,
}
return err;
}
template class requestDispatch<int>;
template class requestDispatch<FILE*>;

View file

@ -10,7 +10,23 @@
#include "network.h"
#include "datatuple.h"
#include "logstore.h"
template<class HANDLE>
class requestDispatch {
private:
static inline int op_insert(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple);
static inline int op_find(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple);
static inline int op_scan(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2, size_t limit);
static inline int op_flush(logtable<datatuple> * ltable, HANDLE fd);
static inline int op_shutdown(logtable<datatuple> * ltable, HANDLE fd);
static inline int op_stat_space_usage(logtable<datatuple> * ltable, HANDLE fd);
static inline int op_stat_perf_report(logtable<datatuple> * ltable, HANDLE fd);
static inline int op_stat_histogram(logtable<datatuple> * ltable, HANDLE fd, size_t limit);
static inline int op_dbg_blockmap(logtable<datatuple> * ltable, HANDLE fd);
static inline int op_dbg_drop_database(logtable<datatuple> * ltable, HANDLE fd);
static inline int op_dbg_noop(logtable<datatuple> * ltable, HANDLE fd);
int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable<datatuple> * logstore, int fd);
public:
static int dispatch_request(HANDLE f, logtable<datatuple> * ltable);
static int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable<datatuple> * ltable, HANDLE fd);
};
#endif /* REQUESTDISPATCH_H_ */

146
simpleServer.cpp Normal file
View file

@ -0,0 +1,146 @@
/*
* simpleServer.cpp
*
* Created on: Aug 11, 2010
* Author: sears
*/
#include "simpleServer.h"
#include "requestDispatch.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
typedef struct {
simpleServer * obj;
int self;
} worker_arg;
void * worker_wrap(void * arg) {
((worker_arg*)arg)->obj->worker(((worker_arg*)arg)->self);
free(arg);
return 0;
}
void * simpleServer::worker(int self) {
while(true) {
pthread_mutex_lock(&thread_mut[self]);
thread_fd[self] = -1;
while(thread_fd[self] == -1) {
if(!running) { return 0; }
pthread_cond_wait(&thread_cond[self], &thread_mut[self]);
}
pthread_mutex_unlock(&thread_mut[self]);
FILE * f = fdopen(thread_fd[self], "a+");
while(!requestDispatch<FILE*>::dispatch_request(f, ltable)) { }
fclose(f);
}
}
simpleServer::simpleServer(logtable<datatuple> * ltable, int max_threads, int port):
ltable(ltable),
port(port),
max_threads(max_threads),
thread_fd((int*)malloc(sizeof(*thread_fd)*max_threads)),
thread_cond((pthread_cond_t*)malloc(sizeof(*thread_cond)*max_threads)),
thread_mut((pthread_mutex_t*)malloc(sizeof(*thread_mut)*max_threads)),
thread((pthread_t*)malloc(sizeof(*thread)*max_threads)),
running(true) {
for(int i = 0; i < max_threads; i++) {
thread_fd[i] = -1;
pthread_cond_init(&thread_cond[i], 0);
pthread_mutex_init(&thread_mut[i], 0);
worker_arg * arg = (worker_arg*)malloc(sizeof(worker_arg));
arg->obj = this;
arg->self = i;
pthread_create(&thread[i], 0, worker_wrap, (void*)arg);
}
}
bool simpleServer::acceptLoop() {
int sockfd;
struct sockaddr_in serv_addr;
struct sockaddr_in cli_addr;
int newsockfd;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if(sockfd == -1) {
perror("ERROR opening socket");
return false;
}
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); // XXX security...
serv_addr.sin_port = htons(port);
if(bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) == -1) {
perror("ERROR on binding");
return false;
}
if(listen(sockfd,SOMAXCONN)==-1) {
perror("ERROR on listen");
return false;
}
printf("LSM Server listening....\n");
// *(sdata->server_socket) = sockfd;
int flag, result;
while(true) {
socklen_t clilen = sizeof(cli_addr);
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
if(newsockfd == -1) {
perror("ERROR on accept");
} else {
flag = 1;
result = setsockopt(newsockfd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(int));
if(result == -1) {
perror("ERROR on setting socket option TCP_NODELAY");
// ignore the error.
}
// char clientip[20];
// inet_ntop(AF_INET, (void*) &(cli_addr.sin_addr), clientip, 20);
// printf("Connection from %s\n", clientip);
int i;
for(i = 0; i < max_threads; i++) { //TODO round robin or something? (currently, we don't care if connect() is slow...)
pthread_mutex_lock(&thread_mut[i]);
if(thread_fd[i] == -1) {
thread_fd[i] = newsockfd;
DEBUG("connect %d\n", i);
pthread_cond_signal(&thread_cond[i]);
pthread_mutex_unlock(&thread_mut[i]);
break;
} else {
pthread_mutex_unlock(&thread_mut[i]);
}
}
if(i == max_threads) {
printf("all threads are busy. rejecting connection\n");
close(newsockfd);
}
}
}
}
simpleServer::~simpleServer() {
running = false;
for(int i = 0; i < max_threads; i++) {
pthread_cond_signal(&thread_cond[i]);
pthread_join(thread[i],0);
pthread_mutex_destroy(&thread_mut[i]);
pthread_cond_destroy(&thread_cond[i]);
}
free(thread);
free(thread_mut);
free(thread_cond);
free(thread_fd);
}

32
simpleServer.h Normal file
View file

@ -0,0 +1,32 @@
/*
* simpleServer.h
*
* Created on: Aug 11, 2010
* Author: sears
*/
#ifndef SIMPLESERVER_H_
#define SIMPLESERVER_H_
#include "logstore.h"
class simpleServer {
public:
void* worker(int /*handle*/);
static const int DEFAULT_PORT = 32432;
static const int DEFAULT_THREADS = 1000;
simpleServer(logtable<datatuple> * ltable, int max_threads = DEFAULT_THREADS, int port = DEFAULT_PORT);
bool acceptLoop();
~simpleServer();
private:
logtable<datatuple>* ltable;
int port;
int max_threads;
int * thread_fd;
pthread_cond_t * thread_cond;
pthread_mutex_t * thread_mut;
pthread_t * thread;
bool running;
};
#endif /* SIMPLESERVER_H_ */

View file

@ -107,22 +107,22 @@ logstore_client_op_returns_many(logstore_handle_t *l,
network_op_t err = 0;
//send the opcode
if( !err) { err = fwritetosocket(l->server_fsocket, &opcode, sizeof(opcode)); }
if( !err) { err = writetosocket(l->server_fsocket, &opcode, sizeof(opcode)); }
//send the first tuple
if( !err) { err = fwritetupletosocket(l->server_fsocket, tuple); }
if( !err) { err = writetupletosocket(l->server_fsocket, tuple); }
//send the second tuple
if( !err) { err = fwritetupletosocket(l->server_fsocket, tuple2); }
if( !err) { err = writetupletosocket(l->server_fsocket, tuple2); }
if( (!err) && (count != (uint64_t)-1) ) {
err = fwritecounttosocket(l->server_fsocket, count); }
err = writecounttosocket(l->server_fsocket, count); }
fflush_unlocked(l->server_fsocket);
network_op_t rcode = LOGSTORE_CONN_CLOSED_ERROR;
if( !err) {
rcode = freadopfromsocket(l->server_fsocket,LOGSTORE_SERVER_RESPONSE);
rcode = readopfromsocket(l->server_fsocket,LOGSTORE_SERVER_RESPONSE);
}
if( opiserror(rcode) ) { close_conn(l); }
@ -134,7 +134,7 @@ datatuple *
logstore_client_next_tuple(logstore_handle_t *l) {
assert(l->server_fsocket != 0); // otherwise, then the client forgot to check a return value...
int err = 0;
datatuple * ret = freadtuplefromsocket(l->server_fsocket, &err);
datatuple * ret = readtuplefromsocket(l->server_fsocket, &err);
if(err) {
close_conn(l);
if(ret) {
@ -179,7 +179,7 @@ logstore_client_op(logstore_handle_t *l,
int logstore_client_close(logstore_handle_t* l) {
if(l->server_fsocket)
{
fwritetosocket(l->server_fsocket, (char*) &OP_DONE, sizeof(uint8_t));
writetosocket(l->server_fsocket, (char*) &OP_DONE, sizeof(uint8_t));
fclose(l->server_fsocket);
DEBUG("socket closed %d\n.", l->server_fsocket);