remove a bunch of whitespace / comments. clean up network thread. add constant for dumping the block map
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@570 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
478d394365
commit
e2ea48d233
4 changed files with 71 additions and 143 deletions
173
logserver.cpp
173
logserver.cpp
|
@ -63,8 +63,6 @@ void logserver::startserver(logtable *ltable)
|
|||
worker_data->workitem = new int;
|
||||
*(worker_data->workitem) = -1;
|
||||
|
||||
//worker_data->table_lock = lsmlock;
|
||||
|
||||
worker_data->ltable = ltable;
|
||||
|
||||
worker_data->sys_alive = &sys_alive;
|
||||
|
@ -72,12 +70,8 @@ void logserver::startserver(logtable *ltable)
|
|||
pthread_create(worker_th->th_handle, 0, thread_work_fn, worker_th);
|
||||
|
||||
idleth_queue.push(*worker_th);
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
//start server socket
|
||||
sdata = new serverth_data;
|
||||
sdata->server_socket = &serversocket;
|
||||
|
@ -192,8 +186,6 @@ void logserver::stopserver()
|
|||
printf("tot time / num reqs:\t%.3f\n", tot_time / num_reqs );
|
||||
}
|
||||
#endif
|
||||
|
||||
//close(serversocket);
|
||||
|
||||
return;
|
||||
}
|
||||
|
@ -216,20 +208,14 @@ void logserver::eventLoop()
|
|||
|
||||
ts.tv_nsec = 250000; //nanosec
|
||||
ts.tv_sec = 0;
|
||||
|
||||
//Timeout.tv_usec = 250; /* microseconds */
|
||||
//Timeout.tv_sec = 0; /* seconds */
|
||||
|
||||
//update select set
|
||||
pthread_mutex_lock(qlock);
|
||||
|
||||
//while(ready_queue.size() == 0)
|
||||
if(sel_list.size() == 0)
|
||||
{
|
||||
while(ready_queue.size() == 0)
|
||||
pthread_cond_wait(selcond, qlock);
|
||||
//pthread_cond_timedwait(selcond, qlock, &ts);
|
||||
//printf("awoke\n");
|
||||
}
|
||||
|
||||
//new connections + processed conns are in ready_queue
|
||||
|
@ -252,10 +238,6 @@ void logserver::eventLoop()
|
|||
|
||||
//select events
|
||||
int sel_res = select(maxfd+1, &readfs, NULL, NULL, NULL);// &Timeout);
|
||||
//printf("sel_res %d %d\n", sel_res, errno);
|
||||
//fflush(stdout);
|
||||
//job assignment to threads
|
||||
//printf("sel_list size:\t%d ready_cnt\t%d\n", sel_list.size(), sel_res);
|
||||
|
||||
#ifdef STATS_ENABLED
|
||||
if(num_selcalls == 0)
|
||||
|
@ -272,8 +254,6 @@ void logserver::eventLoop()
|
|||
|
||||
if (FD_ISSET(currsock, &readfs))
|
||||
{
|
||||
//printf("sock %d ready\n", currsock);
|
||||
// pthread_mutex_lock(qlock);
|
||||
|
||||
if(idleth_queue.size() > 0) //assign the job to an indle thread
|
||||
{
|
||||
|
@ -286,16 +266,12 @@ void logserver::eventLoop()
|
|||
*(idle_th.data->workitem) = currsock;
|
||||
pthread_cond_signal(idle_th.data->th_cond);
|
||||
pthread_mutex_unlock(idle_th.data->th_mut);
|
||||
//printf("%d:\tconn %d assigned.\n", i, currsock);
|
||||
}
|
||||
else
|
||||
{
|
||||
//insert the given element to the work queue
|
||||
work_queue.push(currsock);
|
||||
//printf("work queue size:\t%d\n", work_queue.size());
|
||||
}
|
||||
|
||||
// pthread_mutex_unlock(qlock);
|
||||
|
||||
//remove from the sel_list
|
||||
sel_list.erase(sel_list.begin()+i);
|
||||
|
@ -323,7 +299,6 @@ void *serverLoop(void *args)
|
|||
struct sockaddr_in cli_addr;
|
||||
int newsockfd; //newly created
|
||||
socklen_t clilen = sizeof(cli_addr);
|
||||
|
||||
|
||||
//open a socket
|
||||
sockfd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
|
@ -382,8 +357,6 @@ void *serverLoop(void *args)
|
|||
inet_ntop(AF_INET, (void*) &(cli_addr.sin_addr), clientip, 20);
|
||||
printf("Connection from:\t%s\n", clientip);
|
||||
|
||||
//printf("Number of idle threads %d\n", idleth_queue.size());
|
||||
|
||||
pthread_mutex_lock(sdata->qlock);
|
||||
|
||||
//insert the given element to the ready queue
|
||||
|
@ -393,16 +366,36 @@ void *serverLoop(void *args)
|
|||
pthread_cond_signal(sdata->selcond);
|
||||
|
||||
pthread_mutex_unlock(sdata->qlock);
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
static void network_disconnect(pthread_item * item, bool iserror) {
|
||||
pthread_mutex_lock(item->data->qlock);
|
||||
if(iserror) {
|
||||
printf("network error. conn closed. (%d, %d, %d)\n",
|
||||
errno, *(item->data->workitem), item->data->work_queue->size());
|
||||
} else {
|
||||
printf("client done. conn closed. (%d, %d)\n",
|
||||
*(item->data->workitem), item->data->work_queue->size());
|
||||
}
|
||||
close(*(item->data->workitem));
|
||||
|
||||
if(item->data->work_queue->size() > 0)
|
||||
{
|
||||
int new_work = item->data->work_queue->front();
|
||||
item->data->work_queue->pop();
|
||||
*(item->data->workitem) = new_work;
|
||||
}
|
||||
else
|
||||
{
|
||||
//set work to -1
|
||||
*(item->data->workitem) = -1;
|
||||
//add self to idle queue
|
||||
item->data->idleth_queue->push(*item);
|
||||
}
|
||||
|
||||
|
||||
|
||||
pthread_mutex_unlock(item->data->qlock);
|
||||
}
|
||||
|
||||
void * thread_work_fn( void * args)
|
||||
{
|
||||
|
@ -427,7 +420,6 @@ void * thread_work_fn( void * args)
|
|||
|
||||
if(!*(item->data->sys_alive))
|
||||
{
|
||||
//printf("thread quitted.\n");
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -440,62 +432,28 @@ void * thread_work_fn( void * args)
|
|||
|
||||
if( opcode == OP_DONE || (opiserror(opcode))) //close the conn on failure
|
||||
{
|
||||
pthread_mutex_lock(item->data->qlock);
|
||||
if(opiserror(opcode)) {
|
||||
printf("network error. conn closed. (%d, %d, %d, %d)\n",
|
||||
opcode, errno, *(item->data->workitem), item->data->work_queue->size());
|
||||
} else {
|
||||
printf("client done. conn closed. (%d, %d)\n",
|
||||
*(item->data->workitem), item->data->work_queue->size());
|
||||
}
|
||||
close(*(item->data->workitem));
|
||||
|
||||
if(item->data->work_queue->size() > 0)
|
||||
{
|
||||
int new_work = item->data->work_queue->front();
|
||||
item->data->work_queue->pop();
|
||||
//printf("work queue size:\t%d\n", item->data->work_queue->size());
|
||||
*(item->data->workitem) = new_work;
|
||||
}
|
||||
else
|
||||
{
|
||||
//set work to -1
|
||||
*(item->data->workitem) = -1;
|
||||
//add self to idle queue
|
||||
item->data->idleth_queue->push(*item);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(item->data->qlock);
|
||||
continue;
|
||||
network_disconnect(item, opiserror(opcode));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
//step 2: read the tuple from client
|
||||
datatuple * tuple = readtuplefromsocket(*(item->data->workitem));
|
||||
//step 3: process the tuple
|
||||
//pthread_mutex_lock(item->data->table_lock);
|
||||
//readlock(item->data->table_lock,0);
|
||||
|
||||
int err = 0;
|
||||
|
||||
if(opcode == OP_INSERT)
|
||||
{
|
||||
//insert/update/delete
|
||||
item->data->ltable->insertTuple(tuple);
|
||||
//unlock the lsmlock
|
||||
//pthread_mutex_unlock(item->data->table_lock);
|
||||
//unlock(item->data->table_lock);
|
||||
//step 4: send response
|
||||
int err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SUCCESS);
|
||||
if(err) {
|
||||
perror("could not respond to client");
|
||||
}
|
||||
err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SUCCESS);
|
||||
}
|
||||
else if(opcode == OP_FIND)
|
||||
{
|
||||
//find the tuple
|
||||
datatuple *dt = item->data->ltable->findTuple(-1, tuple->key(), tuple->keylen());
|
||||
//unlock the lsmlock
|
||||
//pthread_mutex_unlock(item->data->table_lock);
|
||||
//unlock(item->data->table_lock);
|
||||
|
||||
#ifdef STATS_ENABLED
|
||||
|
||||
|
@ -524,55 +482,60 @@ void * thread_work_fn( void * args)
|
|||
|
||||
//send the reply code
|
||||
int err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES);
|
||||
|
||||
//send the tuple
|
||||
writetupletosocket(*(item->data->workitem), dt);
|
||||
if(!err) {
|
||||
//send the tuple
|
||||
err = writetupletosocket(*(item->data->workitem), dt);
|
||||
}
|
||||
|
||||
//free datatuple
|
||||
if(dt_needs_free) {
|
||||
datatuple::freetuple(dt);
|
||||
}
|
||||
}
|
||||
else if(opcode == OP_DBG_BLOCKMAP)
|
||||
{
|
||||
// produce a list of stasis regions
|
||||
|
||||
// produce a list of regions used by current tree components
|
||||
|
||||
|
||||
}
|
||||
|
||||
//free the tuple
|
||||
datatuple::freetuple(tuple);
|
||||
|
||||
//printf("socket %d: work completed.", *(item->data->workitem));
|
||||
|
||||
pthread_mutex_lock(item->data->qlock);
|
||||
|
||||
//add conn desc to ready queue
|
||||
item->data->ready_queue->push(*(item->data->workitem));
|
||||
//printf("ready queue size: %d sock(%d)\n", item->data->ready_queue->size(), *(item->data->workitem));
|
||||
if(item->data->ready_queue->size() == 1) //signal the event loop
|
||||
pthread_cond_signal(item->data->selcond);
|
||||
if(err) {
|
||||
perror("could not respond to client");
|
||||
network_disconnect(item, true);
|
||||
continue;
|
||||
} else {
|
||||
pthread_mutex_lock(item->data->qlock);
|
||||
|
||||
//printf("work complete, added to ready queue %d (size %d)\n", *(item->data->workitem),
|
||||
// item->data->ready_queue->size());
|
||||
|
||||
if(item->data->work_queue->size() > 0)
|
||||
{
|
||||
int new_work = item->data->work_queue->front();
|
||||
item->data->work_queue->pop();
|
||||
//printf("work queue size:\t%d\n", item->data->work_queue->size());
|
||||
*(item->data->workitem) = new_work;
|
||||
}
|
||||
else
|
||||
{
|
||||
//set work to -1
|
||||
*(item->data->workitem) = -1;
|
||||
//add self to idle queue
|
||||
item->data->idleth_queue->push(*item);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(item->data->qlock);
|
||||
//add conn desc to ready queue
|
||||
item->data->ready_queue->push(*(item->data->workitem));
|
||||
if(item->data->ready_queue->size() == 1) //signal the event loop
|
||||
pthread_cond_signal(item->data->selcond);
|
||||
|
||||
if(item->data->work_queue->size() > 0)
|
||||
{
|
||||
int new_work = item->data->work_queue->front();
|
||||
item->data->work_queue->pop();
|
||||
*(item->data->workitem) = new_work;
|
||||
}
|
||||
else
|
||||
{
|
||||
//set work to -1
|
||||
*(item->data->workitem) = -1;
|
||||
//add self to idle queue
|
||||
item->data->idleth_queue->push(*item);
|
||||
}
|
||||
pthread_mutex_unlock(item->data->qlock);
|
||||
}
|
||||
#ifdef STATS_ENABLED
|
||||
if( item->data->num_reqs == 0 )
|
||||
item->data->work_time = 0;
|
||||
gettimeofday(& (item->data->stop_tv), 0);
|
||||
(item->data->num_reqs)++;
|
||||
//item->data->work_time += tv_to_double(item->data->stop_tv) - tv_to_double(item->data->start_tv);
|
||||
item->data->work_time += (item->data->stop_tv.tv_sec - item->data->start_tv.tv_sec) * 1000 +
|
||||
(item->data->stop_tv.tv_usec / 1000 - item->data->start_tv.tv_usec / 1000);
|
||||
|
||||
|
@ -589,8 +552,6 @@ void * thread_work_fn( void * args)
|
|||
item->data->work_timec[clientkey] += (item->data->stop_tv.tv_sec - item->data->start_tv.tv_sec) * 1000 +
|
||||
(item->data->stop_tv.tv_usec / 1000 - item->data->start_tv.tv_usec / 1000);;
|
||||
#endif
|
||||
|
||||
|
||||
}
|
||||
pthread_mutex_unlock(item->data->th_mut);
|
||||
|
||||
|
|
35
logserver.h
35
logserver.h
|
@ -1,16 +1,11 @@
|
|||
#ifndef _LOGSERVER_H_
|
||||
#define _LOGSERVER_H_
|
||||
|
||||
|
||||
#include <queue>
|
||||
#include <vector>
|
||||
|
||||
//#include "logstore.h"
|
||||
|
||||
#include "datatuple.h"
|
||||
|
||||
|
||||
|
||||
#include <stasis/transactional.h>
|
||||
#include <pthread.h>
|
||||
|
||||
|
@ -28,8 +23,6 @@
|
|||
|
||||
class logtable;
|
||||
|
||||
|
||||
|
||||
struct pthread_item;
|
||||
|
||||
struct pthread_data {
|
||||
|
@ -45,8 +38,6 @@ struct pthread_data {
|
|||
|
||||
int *workitem; //id of the socket to work
|
||||
|
||||
//pthread_mutex_t * table_lock;
|
||||
//rwl *table_lock;
|
||||
logtable *ltable;
|
||||
bool *sys_alive;
|
||||
|
||||
|
@ -65,14 +56,6 @@ struct pthread_item{
|
|||
pthread_data *data;
|
||||
};
|
||||
|
||||
|
||||
//struct work_item
|
||||
//{
|
||||
// int sockd; //socket id
|
||||
// datatuple in_tuple; //request
|
||||
// datatuple out_tuple; //response
|
||||
//};
|
||||
|
||||
struct serverth_data
|
||||
{
|
||||
int *server_socket;
|
||||
|
@ -93,10 +76,6 @@ public:
|
|||
logserver(int nthreads, int server_port){
|
||||
this->nthreads = nthreads;
|
||||
this->server_port = server_port;
|
||||
//lsmlock = new pthread_mutex_t;
|
||||
//pthread_mutex_init(lsmlock,0);
|
||||
|
||||
//lsmlock = initlock();
|
||||
|
||||
qlock = new pthread_mutex_t;
|
||||
pthread_mutex_init(qlock,0);
|
||||
|
@ -113,8 +92,6 @@ public:
|
|||
|
||||
~logserver()
|
||||
{
|
||||
//delete lsmlock;
|
||||
//deletelock(lsmlock);
|
||||
delete qlock;
|
||||
}
|
||||
|
||||
|
@ -126,12 +103,7 @@ private:
|
|||
|
||||
//main loop of server
|
||||
//accept connections, assign jobs to threads
|
||||
//void dispatchLoop();
|
||||
|
||||
void eventLoop();
|
||||
|
||||
|
||||
private:
|
||||
|
||||
int server_port;
|
||||
|
||||
|
@ -141,10 +113,6 @@ private:
|
|||
|
||||
int serversocket; //server socket file descriptor
|
||||
|
||||
//ccqueue<int> conn_queue; //list of active connections (socket list)
|
||||
|
||||
//ccqueue<pthread_item> idleth_queue; //list of idle threads
|
||||
|
||||
std::queue<int> ready_queue; //connections to go inside select
|
||||
std::queue<int> work_queue; //connections to be processed by worker threads
|
||||
std::queue<pthread_item> idleth_queue;
|
||||
|
@ -156,11 +124,8 @@ private:
|
|||
|
||||
std::vector<pthread_item *> th_list; // list of threads
|
||||
|
||||
//rwl *lsmlock; //lock for using lsm table
|
||||
|
||||
logtable *ltable;
|
||||
|
||||
|
||||
#ifdef STATS_ENABLED
|
||||
int num_reqs;
|
||||
int num_selevents;
|
||||
|
|
|
@ -177,7 +177,6 @@ public:
|
|||
|
||||
void insertTuple(struct datatuple *tuple);
|
||||
|
||||
|
||||
//other class functions
|
||||
recordid allocTable(int xid);
|
||||
|
||||
|
|
|
@ -26,7 +26,10 @@ static const network_op_t OP_INSERT = 8; // Create, Update, Delet
|
|||
static const network_op_t OP_FIND = 9; // Read
|
||||
|
||||
static const network_op_t OP_DONE = 10; // Please close the connection.
|
||||
static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 10;
|
||||
|
||||
static const network_op_t OP_DBG_BLOCKMAP = 11;
|
||||
|
||||
static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 11;
|
||||
|
||||
//error codes
|
||||
static const network_op_t LOGSTORE_FIRST_ERROR = 28;
|
||||
|
|
Loading…
Reference in a new issue