2010-01-23 02:13:59 +00:00
|
|
|
#include "logserver.h"
|
|
|
|
#include "datatuple.h"
|
2010-02-17 22:11:22 +00:00
|
|
|
#include "merger.h"
|
2010-01-23 02:13:59 +00:00
|
|
|
|
|
|
|
#include "logstore.h"
|
|
|
|
|
2010-02-02 23:50:16 +00:00
|
|
|
#include "network.h"
|
|
|
|
|
2010-01-23 02:13:59 +00:00
|
|
|
#include <sys/types.h>
|
|
|
|
#include <sys/socket.h>
|
|
|
|
#include <netinet/in.h>
|
|
|
|
#include <netinet/tcp.h>
|
|
|
|
#include <arpa/inet.h>
|
|
|
|
#include <sys/select.h>
|
|
|
|
#include <errno.h>
|
|
|
|
|
2010-02-17 22:11:22 +00:00
|
|
|
#include <stasis/operations/regions.h>
|
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
#include <unistd.h>
|
|
|
|
#include <fcntl.h>
|
|
|
|
|
|
|
|
|
2010-01-23 02:13:59 +00:00
|
|
|
#undef begin
|
|
|
|
#undef end
|
|
|
|
#undef try
|
|
|
|
|
|
|
|
|
|
|
|
void *serverLoop(void *args);
|
|
|
|
|
|
|
|
void logserver::startserver(logtable *ltable)
|
|
|
|
{
|
|
|
|
sys_alive = true;
|
|
|
|
this->ltable = ltable;
|
|
|
|
|
|
|
|
selcond = new pthread_cond_t;
|
|
|
|
pthread_cond_init(selcond, 0);
|
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
self_pipe = (int*)malloc(2 * sizeof(int));
|
|
|
|
pipe(self_pipe);
|
|
|
|
|
|
|
|
fcntl(self_pipe[0], F_SETFL, O_NONBLOCK);
|
|
|
|
fcntl(self_pipe[1], F_SETFL, O_NONBLOCK);
|
|
|
|
|
|
|
|
|
2010-01-23 02:13:59 +00:00
|
|
|
//initialize threads
|
2010-01-27 23:34:33 +00:00
|
|
|
for(size_t i=0; i<nthreads; i++)
|
2010-01-23 02:13:59 +00:00
|
|
|
{
|
|
|
|
struct pthread_item *worker_th = new pthread_item;
|
|
|
|
th_list.push_back(worker_th);
|
|
|
|
|
|
|
|
worker_th->th_handle = new pthread_t;
|
|
|
|
struct pthread_data *worker_data = new pthread_data;
|
|
|
|
worker_th->data = worker_data;
|
|
|
|
|
|
|
|
worker_data->idleth_queue = &idleth_queue;
|
|
|
|
worker_data->ready_queue = &ready_queue;
|
|
|
|
worker_data->work_queue = &work_queue;
|
|
|
|
|
2010-02-10 21:49:50 +00:00
|
|
|
#ifdef STATS_ENABLED
|
|
|
|
worker_data->num_reqs = 0;
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
2010-01-23 02:13:59 +00:00
|
|
|
worker_data->qlock = qlock;
|
|
|
|
|
|
|
|
worker_data->selcond = selcond;
|
2010-02-23 23:54:44 +00:00
|
|
|
worker_data->self_pipe = self_pipe;
|
2010-01-23 02:13:59 +00:00
|
|
|
|
|
|
|
worker_data->th_cond = new pthread_cond_t;
|
|
|
|
pthread_cond_init(worker_data->th_cond,0);
|
|
|
|
|
|
|
|
worker_data->th_mut = new pthread_mutex_t;
|
|
|
|
pthread_mutex_init(worker_data->th_mut,0);
|
|
|
|
|
|
|
|
worker_data->workitem = new int;
|
|
|
|
*(worker_data->workitem) = -1;
|
|
|
|
|
|
|
|
worker_data->ltable = ltable;
|
|
|
|
|
|
|
|
worker_data->sys_alive = &sys_alive;
|
|
|
|
|
|
|
|
pthread_create(worker_th->th_handle, 0, thread_work_fn, worker_th);
|
|
|
|
|
|
|
|
idleth_queue.push(*worker_th);
|
|
|
|
}
|
|
|
|
|
|
|
|
//start server socket
|
|
|
|
sdata = new serverth_data;
|
|
|
|
sdata->server_socket = &serversocket;
|
|
|
|
sdata->server_port = server_port;
|
|
|
|
sdata->idleth_queue = &idleth_queue;
|
|
|
|
sdata->ready_queue = &ready_queue;
|
|
|
|
sdata->selcond = selcond;
|
2010-02-23 23:54:44 +00:00
|
|
|
sdata->self_pipe = self_pipe;
|
2010-01-23 02:13:59 +00:00
|
|
|
sdata->qlock = qlock;
|
|
|
|
|
|
|
|
pthread_create(&server_thread, 0, serverLoop, sdata);
|
|
|
|
|
|
|
|
//start monitoring loop
|
|
|
|
eventLoop();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
void logserver::stopserver()
|
|
|
|
{
|
|
|
|
//close the server socket
|
|
|
|
//stops receiving data on the server socket
|
|
|
|
shutdown(serversocket, 0);
|
|
|
|
|
|
|
|
//wait for all threads to be idle
|
|
|
|
while(idleth_queue.size() != nthreads)
|
|
|
|
sleep(1);
|
|
|
|
|
|
|
|
#ifdef STATS_ENABLED
|
|
|
|
printf("\n\nSTATISTICS\n");
|
|
|
|
std::map<std::string, int> num_reqsc;
|
|
|
|
std::map<std::string, double> work_timec;
|
|
|
|
#endif
|
|
|
|
|
|
|
|
//set the system running flag to false
|
|
|
|
sys_alive = false;
|
2010-01-27 23:34:33 +00:00
|
|
|
for(size_t i=0; i<nthreads; i++)
|
2010-01-23 02:13:59 +00:00
|
|
|
{
|
|
|
|
pthread_item *idle_th = th_list[i];
|
|
|
|
|
|
|
|
//wake up the thread
|
|
|
|
pthread_mutex_lock(idle_th->data->th_mut);
|
|
|
|
pthread_cond_signal(idle_th->data->th_cond);
|
|
|
|
pthread_mutex_unlock(idle_th->data->th_mut);
|
|
|
|
//wait for it to join
|
|
|
|
pthread_join(*(idle_th->th_handle), 0);
|
|
|
|
//free the thread variables
|
|
|
|
pthread_cond_destroy(idle_th->data->th_cond);
|
|
|
|
|
|
|
|
#ifdef STATS_ENABLED
|
|
|
|
if(i == 0)
|
|
|
|
{
|
|
|
|
tot_threadwork_time = 0;
|
|
|
|
num_reqs = 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
tot_threadwork_time += idle_th->data->work_time;
|
|
|
|
num_reqs += idle_th->data->num_reqs;
|
|
|
|
|
|
|
|
printf("thread %d: work_time %.3f\t#calls %d\tavg req process time:\t%.3f\n",
|
|
|
|
i,
|
|
|
|
idle_th->data->work_time,
|
|
|
|
idle_th->data->num_reqs,
|
|
|
|
(( idle_th->data->num_reqs == 0 ) ? 0 : idle_th->data->work_time / idle_th->data->num_reqs)
|
|
|
|
);
|
|
|
|
|
|
|
|
for(std::map<std::string, int>::const_iterator itr = idle_th->data->num_reqsc.begin();
|
|
|
|
itr != idle_th->data->num_reqsc.end(); itr++)
|
|
|
|
{
|
|
|
|
std::string ckey = (*itr).first;
|
|
|
|
printf("\t%s\t%d\t%.3f\t%.3f\n", ckey.c_str(), (*itr).second, idle_th->data->work_timec[ckey],
|
|
|
|
idle_th->data->work_timec[ckey] / (*itr).second);
|
|
|
|
|
|
|
|
if(num_reqsc.find(ckey) == num_reqsc.end()){
|
|
|
|
num_reqsc[ckey] = 0;
|
|
|
|
work_timec[ckey] = 0;
|
|
|
|
}
|
|
|
|
num_reqsc[ckey] += (*itr).second;
|
|
|
|
work_timec[ckey] += idle_th->data->work_timec[ckey];
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
delete idle_th->data->th_cond;
|
|
|
|
delete idle_th->data->th_mut;
|
|
|
|
delete idle_th->data->workitem;
|
|
|
|
delete idle_th->data;
|
|
|
|
delete idle_th->th_handle;
|
|
|
|
}
|
|
|
|
|
|
|
|
th_list.clear();
|
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
close(self_pipe[0]);
|
|
|
|
close(self_pipe[1]);
|
|
|
|
free(self_pipe);
|
|
|
|
|
2010-01-23 02:13:59 +00:00
|
|
|
#ifdef STATS_ENABLED
|
|
|
|
|
|
|
|
printf("\n\nAggregated Stats:\n");
|
|
|
|
for(std::map<std::string, int>::const_iterator itr = num_reqsc.begin();
|
|
|
|
itr != num_reqsc.end(); itr++)
|
|
|
|
{
|
|
|
|
std::string ckey = (*itr).first;
|
|
|
|
printf("\t%s\t%d\t%.3f\t%.3f\n", ckey.c_str(), (*itr).second, work_timec[ckey],
|
|
|
|
work_timec[ckey] / (*itr).second);
|
|
|
|
}
|
|
|
|
|
|
|
|
tot_time = (stop_tv.tv_sec - start_tv.tv_sec) * 1000 +
|
|
|
|
(stop_tv.tv_usec / 1000 - start_tv.tv_usec / 1000);
|
|
|
|
|
|
|
|
printf("\ntot time:\t%f\n",tot_time);
|
|
|
|
printf("tot work time:\t%f\n", tot_threadwork_time);
|
|
|
|
printf("load avg:\t%f\n", tot_threadwork_time / tot_time);
|
|
|
|
|
|
|
|
printf("tot num reqs\t%d\n", num_reqs);
|
|
|
|
if(num_reqs!= 0)
|
|
|
|
{
|
|
|
|
printf("tot work time / num reqs:\t%.3f\n", tot_threadwork_time / num_reqs);
|
|
|
|
printf("tot time / num reqs:\t%.3f\n", tot_time / num_reqs );
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
void logserver::eventLoop()
|
|
|
|
{
|
|
|
|
|
|
|
|
fd_set readfs;
|
|
|
|
|
|
|
|
int maxfd;
|
|
|
|
|
|
|
|
struct timespec ts;
|
2010-02-23 23:54:44 +00:00
|
|
|
std::vector<int> sel_list;
|
|
|
|
sel_list.push_back(self_pipe[0]);
|
2010-01-23 02:13:59 +00:00
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
// struct timeval no_timeout = { 0, 0 };
|
|
|
|
|
2010-01-23 02:13:59 +00:00
|
|
|
while(true)
|
|
|
|
{
|
|
|
|
//clear readset
|
|
|
|
FD_ZERO(&readfs);
|
|
|
|
maxfd = -1;
|
|
|
|
|
|
|
|
ts.tv_nsec = 250000; //nanosec
|
|
|
|
ts.tv_sec = 0;
|
|
|
|
|
|
|
|
//update select set
|
|
|
|
pthread_mutex_lock(qlock);
|
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
assert(sel_list.size() != 0); // self_pipe[0] should always be there.
|
|
|
|
if(sel_list.size() == 1)
|
2010-01-23 02:13:59 +00:00
|
|
|
{
|
2010-02-23 23:54:44 +00:00
|
|
|
assert(sel_list[0] == self_pipe[0]);
|
2010-01-23 02:13:59 +00:00
|
|
|
while(ready_queue.size() == 0)
|
|
|
|
pthread_cond_wait(selcond, qlock);
|
|
|
|
}
|
|
|
|
|
|
|
|
//new connections + processed conns are in ready_queue
|
|
|
|
//add them to select list
|
|
|
|
while(ready_queue.size() > 0)
|
|
|
|
{
|
|
|
|
sel_list.push_back(ready_queue.front());
|
|
|
|
ready_queue.pop();
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(qlock);
|
|
|
|
|
|
|
|
//ready select set
|
|
|
|
for(std::vector<int>::const_iterator itr=sel_list.begin();
|
|
|
|
itr != sel_list.end(); itr++)
|
|
|
|
{
|
|
|
|
if(maxfd < *itr)
|
|
|
|
maxfd = *itr;
|
|
|
|
FD_SET(*itr, &readfs);
|
|
|
|
}
|
|
|
|
|
|
|
|
//select events
|
2010-02-23 23:54:44 +00:00
|
|
|
int sel_res = select(maxfd+1, &readfs, NULL, NULL, NULL); //&no_timeout);// &Timeout);
|
2010-01-23 02:13:59 +00:00
|
|
|
|
|
|
|
#ifdef STATS_ENABLED
|
|
|
|
if(num_selcalls == 0)
|
|
|
|
gettimeofday(&start_tv, 0);
|
|
|
|
|
|
|
|
num_selevents += sel_res;
|
|
|
|
num_selcalls++;
|
|
|
|
#endif
|
|
|
|
|
|
|
|
pthread_mutex_lock(qlock);
|
2010-01-27 23:34:33 +00:00
|
|
|
for(size_t i=0; i<sel_list.size(); i++ )
|
2010-01-23 02:13:59 +00:00
|
|
|
{
|
|
|
|
int currsock = sel_list[i];
|
2010-02-23 23:54:44 +00:00
|
|
|
DEBUG("processing request from currsock = %d\n", currsock);
|
2010-01-23 02:13:59 +00:00
|
|
|
if (FD_ISSET(currsock, &readfs))
|
|
|
|
{
|
2010-02-23 23:54:44 +00:00
|
|
|
if(currsock == self_pipe[0]) {
|
|
|
|
DEBUG("currsock = %d is self_pipe\n", currsock);
|
|
|
|
char gunk;
|
|
|
|
int n;
|
|
|
|
while(1 == (n = read(self_pipe[0], &gunk, 1))) { }
|
|
|
|
if(n == -1) {
|
|
|
|
if(errno != EAGAIN) {
|
|
|
|
perror("Couldn't read self_pipe!");
|
|
|
|
abort();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
assert(n == 1);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
if(idleth_queue.size() > 0) //assign the job to an indle thread
|
|
|
|
{
|
|
|
|
DEBUG("push currsock = %d onto idleth\n", currsock); fflush(stdout);
|
|
|
|
pthread_item idle_th = idleth_queue.front();
|
|
|
|
idleth_queue.pop();
|
|
|
|
|
|
|
|
//wake up the thread to do work
|
|
|
|
pthread_mutex_lock(idle_th.data->th_mut);
|
|
|
|
//set the job of the idle thread
|
|
|
|
assert(currsock != -1);
|
|
|
|
*(idle_th.data->workitem) = currsock;
|
|
|
|
pthread_cond_signal(idle_th.data->th_cond);
|
|
|
|
pthread_mutex_unlock(idle_th.data->th_mut);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
DEBUG("push currsock = %d onto workqueue\n", currsock); fflush(stdout);
|
|
|
|
//insert the given element to the work queue
|
|
|
|
work_queue.push(currsock);
|
|
|
|
}
|
|
|
|
}
|
2010-01-23 02:13:59 +00:00
|
|
|
|
|
|
|
//remove from the sel_list
|
2010-02-23 23:54:44 +00:00
|
|
|
if(currsock != self_pipe[0]) {
|
|
|
|
sel_list.erase(sel_list.begin()+i);
|
|
|
|
i--;
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
DEBUG("not set\n");
|
2010-01-23 02:13:59 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pthread_mutex_unlock(qlock);
|
|
|
|
|
|
|
|
#ifdef STATS_ENABLED
|
|
|
|
gettimeofday(&stop_tv, 0);
|
|
|
|
#endif
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
void *serverLoop(void *args)
|
|
|
|
{
|
|
|
|
|
|
|
|
serverth_data *sdata = (serverth_data*)args;
|
|
|
|
|
|
|
|
int sockfd; //socket descriptor
|
|
|
|
struct sockaddr_in serv_addr;
|
|
|
|
struct sockaddr_in cli_addr;
|
|
|
|
int newsockfd; //newly created
|
|
|
|
socklen_t clilen = sizeof(cli_addr);
|
|
|
|
|
|
|
|
//open a socket
|
|
|
|
sockfd = socket(AF_INET, SOCK_STREAM, 0);
|
|
|
|
if (sockfd < 0)
|
|
|
|
{
|
|
|
|
printf("ERROR opening socket\n");
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
bzero((char *) &serv_addr, sizeof(serv_addr));
|
|
|
|
serv_addr.sin_family = AF_INET;
|
|
|
|
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
|
|
|
|
serv_addr.sin_port = htons(sdata->server_port);
|
|
|
|
|
|
|
|
if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
|
|
|
|
{
|
|
|
|
printf("ERROR on binding.\n");
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
//start listening on the server socket
|
2010-02-10 21:49:50 +00:00
|
|
|
//second arg is the max number of connections waiting in queue
|
2010-01-23 02:13:59 +00:00
|
|
|
if(listen(sockfd,SOMAXCONN)==-1)
|
|
|
|
{
|
|
|
|
printf("ERROR on listen.\n");
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2010-02-10 21:49:50 +00:00
|
|
|
printf("LSM Server listening...\n");
|
2010-01-23 02:13:59 +00:00
|
|
|
|
|
|
|
*(sdata->server_socket) = sockfd;
|
|
|
|
int flag, result;
|
|
|
|
while(true)
|
|
|
|
{
|
|
|
|
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
|
|
|
|
if (newsockfd < 0)
|
|
|
|
{
|
|
|
|
printf("ERROR on accept.\n");
|
|
|
|
return 0; // we probably want to continue instead of return here (when not debugging)
|
|
|
|
}
|
|
|
|
|
|
|
|
flag = 1;
|
|
|
|
result = setsockopt(newsockfd, /* socket affected */
|
|
|
|
IPPROTO_TCP, /* set option at TCP level */
|
|
|
|
TCP_NODELAY, /* name of option */
|
|
|
|
(char *) &flag, /* the cast is historical
|
|
|
|
cruft */
|
|
|
|
sizeof(int)); /* length of option value */
|
|
|
|
if (result < 0)
|
|
|
|
{
|
|
|
|
printf("ERROR on setting socket option TCP_NODELAY.\n");
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
char clientip[20];
|
|
|
|
inet_ntop(AF_INET, (void*) &(cli_addr.sin_addr), clientip, 20);
|
2010-03-01 21:26:07 +00:00
|
|
|
// printf("Connection from:\t%s\n", clientip);
|
2010-01-23 02:13:59 +00:00
|
|
|
|
|
|
|
pthread_mutex_lock(sdata->qlock);
|
|
|
|
|
|
|
|
//insert the given element to the ready queue
|
|
|
|
sdata->ready_queue->push(newsockfd);
|
|
|
|
|
2010-02-23 17:05:47 +00:00
|
|
|
/* if(sdata->ready_queue->size() == 1) //signal the event loop
|
2010-01-23 02:13:59 +00:00
|
|
|
pthread_cond_signal(sdata->selcond);
|
2010-02-23 23:54:44 +00:00
|
|
|
else */ if(sdata->ready_queue->size() == 1) { // signal the event loop
|
|
|
|
pthread_cond_signal(sdata->selcond);
|
|
|
|
char gunk = 42;
|
|
|
|
int ret;
|
|
|
|
if(-1 == (ret = write(sdata->self_pipe[1], &gunk, 1))) {
|
|
|
|
if(errno != EAGAIN) {
|
|
|
|
perror("Couldn't write to pipe!");
|
|
|
|
abort();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
assert(ret == 1);
|
|
|
|
}
|
|
|
|
}
|
2010-01-23 02:13:59 +00:00
|
|
|
pthread_mutex_unlock(sdata->qlock);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2010-02-25 01:29:32 +00:00
|
|
|
int op_insert(pthread_data* data, datatuple * tuple) {
|
|
|
|
//insert/update/delete
|
|
|
|
data->ltable->insertTuple(tuple);
|
|
|
|
//step 4: send response
|
|
|
|
return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS);
|
|
|
|
}
|
|
|
|
int op_find(pthread_data* data, datatuple * tuple) {
|
|
|
|
//find the tuple
|
|
|
|
datatuple *dt = data->ltable->findTuple(-1, tuple->key(), tuple->keylen());
|
2010-02-23 23:54:44 +00:00
|
|
|
|
2010-02-25 01:29:32 +00:00
|
|
|
#ifdef STATS_ENABLED
|
2010-02-23 23:54:44 +00:00
|
|
|
|
2010-02-25 01:29:32 +00:00
|
|
|
if(dt == 0) {
|
|
|
|
DEBUG("key not found:\t%s\n", datatuple::key_to_str(tuple.key()).c_str());
|
|
|
|
} else if( dt->datalen() != 1024) {
|
|
|
|
DEBUG("data len for\t%s:\t%d\n", datatuple::key_to_str(tuple.key()).c_str(),
|
|
|
|
dt->datalen);
|
|
|
|
if(datatuple::compare(tuple->key(), tuple->keylen(), dt->key(), dt->keylen()) != 0) {
|
|
|
|
DEBUG("key not equal:\t%s\t%s\n", datatuple::key_to_str(tuple.key()).c_str(),
|
|
|
|
datatuple::key_to_str(dt->key).c_str());
|
2010-02-23 23:54:44 +00:00
|
|
|
}
|
|
|
|
|
2010-02-16 23:15:29 +00:00
|
|
|
}
|
2010-02-25 01:29:32 +00:00
|
|
|
#endif
|
|
|
|
|
|
|
|
bool dt_needs_free;
|
|
|
|
if(dt == 0) //tuple does not exist.
|
2010-02-16 23:15:29 +00:00
|
|
|
{
|
2010-02-25 01:29:32 +00:00
|
|
|
dt = tuple;
|
|
|
|
dt->setDelete();
|
|
|
|
dt_needs_free = false;
|
|
|
|
} else {
|
|
|
|
dt_needs_free = true;
|
2010-02-16 23:15:29 +00:00
|
|
|
}
|
2010-02-25 01:29:32 +00:00
|
|
|
DEBUG(stderr, "find result: %s\n", dt->isDelete() ? "not found" : "found");
|
|
|
|
//send the reply code
|
|
|
|
int err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES);
|
|
|
|
if(!err) {
|
|
|
|
//send the tuple
|
|
|
|
err = writetupletosocket(*(data->workitem), dt);
|
|
|
|
}
|
|
|
|
if(!err) {
|
|
|
|
writeendofiteratortosocket(*(data->workitem));
|
|
|
|
}
|
|
|
|
//free datatuple
|
|
|
|
if(dt_needs_free) {
|
|
|
|
datatuple::freetuple(dt);
|
|
|
|
}
|
|
|
|
return err;
|
|
|
|
}
|
|
|
|
int op_scan(pthread_data *data, datatuple * tuple, datatuple * tuple2, size_t limit) {
|
|
|
|
size_t count = 0;
|
|
|
|
int err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES);
|
|
|
|
|
|
|
|
if(!err) {
|
|
|
|
logtableIterator<datatuple> * itr = new logtableIterator<datatuple>(data->ltable, tuple);
|
|
|
|
datatuple * t;
|
|
|
|
while(!err && (t = itr->getnext())) {
|
|
|
|
if(tuple2) { // are we at the end of range?
|
|
|
|
if(datatuple::compare_obj(t, tuple2) >= 0) {
|
|
|
|
datatuple::freetuple(t);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
err = writetupletosocket(*(data->workitem), t);
|
|
|
|
datatuple::freetuple(t);
|
|
|
|
count ++;
|
|
|
|
if(count == limit) { break; } // did we hit limit?
|
|
|
|
}
|
|
|
|
delete itr;
|
2010-02-23 23:54:44 +00:00
|
|
|
}
|
2010-02-25 01:29:32 +00:00
|
|
|
if(!err) { writeendofiteratortosocket(*(data->workitem)); }
|
|
|
|
return err;
|
|
|
|
}
|
2010-03-01 21:26:07 +00:00
|
|
|
int op_flush(pthread_data* data) {
|
|
|
|
data->ltable->flushTable();
|
|
|
|
return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS);
|
|
|
|
}
|
|
|
|
int op_shutdown(pthread_data* data) {
|
|
|
|
// XXX
|
|
|
|
return writeoptosocket(*(data->workitem), LOGSTORE_UNIMPLEMENTED_ERROR);
|
|
|
|
}
|
|
|
|
int op_stat_space_usage(pthread_data* data) {
|
|
|
|
|
|
|
|
|
|
|
|
int xid = Tbegin();
|
|
|
|
|
|
|
|
readlock(data->ltable->header_lock, 0);
|
|
|
|
|
|
|
|
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
|
|
|
|
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
|
|
|
|
pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length;
|
|
|
|
pageid_t tree_c1_region_count, tree_c1_mergeable_region_count = 0, tree_c2_region_count;
|
|
|
|
|
|
|
|
pageid_t * datapage_c1_regions = data->ltable->get_tree_c1()->get_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count);
|
|
|
|
|
|
|
|
pageid_t * datapage_c1_mergeable_regions = NULL;
|
|
|
|
if(data->ltable->get_tree_c1_mergeable()) {
|
|
|
|
datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count);
|
|
|
|
}
|
|
|
|
pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count);
|
2010-02-23 23:54:44 +00:00
|
|
|
|
2010-03-01 21:26:07 +00:00
|
|
|
recordid tree_c1_region_header = data->ltable->get_tree_c1()->get_tree_state();
|
2010-03-09 19:47:12 +00:00
|
|
|
pageid_t * tree_c1_regions = diskTreeComponent::internalNodes::list_region_rid(xid, &tree_c1_region_header, &tree_c1_region_length, &tree_c1_region_count);
|
2010-03-01 21:26:07 +00:00
|
|
|
|
|
|
|
pageid_t * tree_c1_mergeable_regions = NULL;
|
|
|
|
if(data->ltable->get_tree_c1_mergeable()) {
|
|
|
|
recordid tree_c1_mergeable_region_header = data->ltable->get_tree_c1_mergeable()->get_tree_state();
|
2010-03-09 19:47:12 +00:00
|
|
|
tree_c1_mergeable_regions = diskTreeComponent::internalNodes::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count);
|
2010-03-01 21:26:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
recordid tree_c2_region_header = data->ltable->get_tree_c2()->get_tree_state();
|
2010-03-09 19:47:12 +00:00
|
|
|
pageid_t * tree_c2_regions = diskTreeComponent::internalNodes::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count);
|
2010-03-01 21:26:07 +00:00
|
|
|
|
|
|
|
free(datapage_c1_regions);
|
|
|
|
free(datapage_c1_mergeable_regions);
|
|
|
|
free(datapage_c2_regions);
|
|
|
|
|
|
|
|
free(tree_c1_regions);
|
|
|
|
free(tree_c1_mergeable_regions);
|
|
|
|
free(tree_c2_regions);
|
|
|
|
|
|
|
|
|
|
|
|
uint64_t treesize = PAGE_SIZE *
|
|
|
|
( ( datapage_c1_region_count * datapage_c1_region_length )
|
|
|
|
+ ( datapage_c1_mergeable_region_count * datapage_c1_mergeable_region_length )
|
|
|
|
+ ( datapage_c2_region_count * datapage_c2_region_length)
|
|
|
|
+ ( tree_c1_region_count * tree_c1_region_length )
|
|
|
|
+ ( tree_c1_mergeable_region_count * tree_c1_mergeable_region_length )
|
|
|
|
+ ( tree_c2_region_count * tree_c2_region_length) );
|
|
|
|
|
|
|
|
boundary_tag tag;
|
|
|
|
pageid_t pid = ROOT_RECORD.page;
|
|
|
|
TregionReadBoundaryTag(xid, pid, &tag);
|
|
|
|
uint64_t max_off = 0;
|
|
|
|
do {
|
|
|
|
max_off = pid + tag.size;
|
|
|
|
;
|
|
|
|
} while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/));
|
|
|
|
|
|
|
|
unlock(data->ltable->header_lock);
|
|
|
|
|
|
|
|
Tcommit(xid);
|
|
|
|
|
|
|
|
uint64_t filesize = max_off * PAGE_SIZE;
|
|
|
|
datatuple *tup = datatuple::create(&treesize, sizeof(treesize), &filesize, sizeof(filesize));
|
|
|
|
|
|
|
|
DEBUG("tree size: %lld, filesize %lld\n", treesize, filesize);
|
|
|
|
|
|
|
|
int err = 0;
|
|
|
|
if(!err){ err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); }
|
|
|
|
if(!err){ err = writetupletosocket(*(data->workitem), tup); }
|
|
|
|
if(!err){ err = writeendofiteratortosocket(*(data->workitem)); }
|
|
|
|
|
|
|
|
|
|
|
|
datatuple::freetuple(tup);
|
|
|
|
|
|
|
|
return err;
|
|
|
|
}
|
|
|
|
int op_stat_perf_report(pthread_data* data) {
|
|
|
|
|
|
|
|
}
|
2010-03-03 23:20:05 +00:00
|
|
|
|
|
|
|
|
2010-03-01 21:26:07 +00:00
|
|
|
int op_stat_histogram(pthread_data* data, size_t limit) {
|
|
|
|
|
2010-03-03 23:20:05 +00:00
|
|
|
if(limit < 3) {
|
|
|
|
return writeoptosocket(*(data->workitem), LOGSTORE_PROTOCOL_ERROR);
|
|
|
|
}
|
|
|
|
|
|
|
|
int xid = Tbegin();
|
2010-03-09 19:47:12 +00:00
|
|
|
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, data->ltable->get_tree_c2()->get_root_rec());
|
2010-03-03 23:20:05 +00:00
|
|
|
size_t count = 0;
|
|
|
|
int err = 0;
|
|
|
|
|
2010-03-05 22:21:44 +00:00
|
|
|
while(it->next()) { count++; }
|
|
|
|
it->close();
|
2010-03-09 00:09:38 +00:00
|
|
|
delete(it);
|
2010-03-03 23:20:05 +00:00
|
|
|
|
|
|
|
uint64_t stride;
|
|
|
|
|
|
|
|
if(count > limit) {
|
|
|
|
stride = count / (limit-1);
|
2010-03-09 17:03:18 +00:00
|
|
|
stride++; // this way, we truncate the last bucket instead of occasionally creating a tiny last bucket.
|
2010-03-03 23:20:05 +00:00
|
|
|
} else {
|
|
|
|
stride = 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
datatuple * tup = datatuple::create(&stride, sizeof(stride));
|
|
|
|
|
|
|
|
if(!err) { err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); }
|
|
|
|
if(!err) { err = writetupletosocket(*(data->workitem), tup); }
|
|
|
|
|
|
|
|
datatuple::freetuple(tup);
|
|
|
|
|
|
|
|
size_t cur_stride = 0;
|
|
|
|
size_t i = 0;
|
2010-03-09 19:47:12 +00:00
|
|
|
it = new diskTreeComponent::internalNodes::iterator(xid, data->ltable->get_tree_c2()->get_root_rec());
|
2010-03-05 22:21:44 +00:00
|
|
|
while(it->next()) {
|
2010-03-03 23:20:05 +00:00
|
|
|
i++;
|
|
|
|
if(i == count || !cur_stride) { // do we want to send this key? (this matches the first, last and interior keys)
|
|
|
|
byte * key;
|
2010-03-05 22:21:44 +00:00
|
|
|
size_t keylen= it->key(&key);
|
2010-03-03 23:20:05 +00:00
|
|
|
tup = datatuple::create(key, keylen);
|
|
|
|
|
|
|
|
if(!err) { err = writetupletosocket(*(data->workitem), tup); }
|
|
|
|
|
|
|
|
datatuple::freetuple(tup);
|
|
|
|
cur_stride = stride;
|
|
|
|
}
|
|
|
|
cur_stride--;
|
|
|
|
}
|
|
|
|
|
2010-03-05 22:21:44 +00:00
|
|
|
it->close();
|
|
|
|
delete(it);
|
2010-03-03 23:20:05 +00:00
|
|
|
if(!err){ err = writeendofiteratortosocket(*(data->workitem)); }
|
|
|
|
Tcommit(xid);
|
|
|
|
return err;
|
2010-03-01 21:26:07 +00:00
|
|
|
}
|
2010-02-25 01:29:32 +00:00
|
|
|
int op_dbg_blockmap(pthread_data* data) {
|
|
|
|
// produce a list of stasis regions
|
|
|
|
int xid = Tbegin();
|
2010-02-23 23:54:44 +00:00
|
|
|
|
2010-02-25 01:29:32 +00:00
|
|
|
readlock(data->ltable->header_lock, 0);
|
|
|
|
|
|
|
|
// produce a list of regions used by current tree components
|
|
|
|
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
|
|
|
|
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
|
|
|
|
pageid_t * datapage_c1_regions = data->ltable->get_tree_c1()->get_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count);
|
|
|
|
pageid_t * datapage_c1_mergeable_regions = NULL;
|
2010-03-01 21:26:07 +00:00
|
|
|
if(data->ltable->get_tree_c1_mergeable()) {
|
|
|
|
datapage_c1_mergeable_regions = data->ltable->get_tree_c1_mergeable()->get_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count);
|
|
|
|
}
|
2010-02-25 01:29:32 +00:00
|
|
|
pageid_t * datapage_c2_regions = data->ltable->get_tree_c2()->get_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count);
|
2010-01-23 02:13:59 +00:00
|
|
|
|
2010-02-25 01:29:32 +00:00
|
|
|
pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length;
|
|
|
|
pageid_t tree_c1_region_count, tree_c1_mergeable_region_count = 0, tree_c2_region_count;
|
|
|
|
|
|
|
|
recordid tree_c1_region_header = data->ltable->get_tree_c1()->get_tree_state();
|
|
|
|
recordid tree_c2_region_header = data->ltable->get_tree_c2()->get_tree_state();
|
|
|
|
|
2010-03-09 19:47:12 +00:00
|
|
|
pageid_t * tree_c1_regions = diskTreeComponent::internalNodes::list_region_rid(xid, &tree_c1_region_header, &tree_c1_region_length, &tree_c1_region_count);
|
2010-02-25 01:29:32 +00:00
|
|
|
pageid_t * tree_c1_mergeable_regions = NULL;
|
2010-03-01 21:26:07 +00:00
|
|
|
if(data->ltable->get_tree_c1_mergeable()) {
|
|
|
|
recordid tree_c1_mergeable_region_header = data->ltable->get_tree_c1_mergeable()->get_tree_state();
|
2010-03-09 19:47:12 +00:00
|
|
|
tree_c1_mergeable_regions = diskTreeComponent::internalNodes::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count);
|
2010-03-01 21:26:07 +00:00
|
|
|
}
|
2010-03-09 19:47:12 +00:00
|
|
|
pageid_t * tree_c2_regions = diskTreeComponent::internalNodes::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count);
|
2010-02-25 01:29:32 +00:00
|
|
|
unlock(data->ltable->header_lock);
|
|
|
|
|
|
|
|
Tcommit(xid);
|
|
|
|
|
|
|
|
printf("C1 Datapage Regions (each is %lld pages long):\n", datapage_c1_region_length);
|
|
|
|
for(pageid_t i = 0; i < datapage_c1_region_count; i++) {
|
|
|
|
printf("%lld ", datapage_c1_regions[i]);
|
|
|
|
}
|
|
|
|
|
|
|
|
printf("\nC1 Internal Node Regions (each is %lld pages long):\n", tree_c1_region_length);
|
|
|
|
for(pageid_t i = 0; i < tree_c1_region_count; i++) {
|
|
|
|
printf("%lld ", tree_c1_regions[i]);
|
|
|
|
}
|
|
|
|
|
|
|
|
printf("\nC2 Datapage Regions (each is %lld pages long):\n", datapage_c2_region_length);
|
|
|
|
for(pageid_t i = 0; i < datapage_c2_region_count; i++) {
|
|
|
|
printf("%lld ", datapage_c2_regions[i]);
|
|
|
|
}
|
|
|
|
|
|
|
|
printf("\nC2 Internal Node Regions (each is %lld pages long):\n", tree_c2_region_length);
|
|
|
|
for(pageid_t i = 0; i < tree_c2_region_count; i++) {
|
|
|
|
printf("%lld ", tree_c2_regions[i]);
|
2010-02-23 23:54:44 +00:00
|
|
|
}
|
2010-02-25 01:29:32 +00:00
|
|
|
printf("\nStasis Region Map\n");
|
|
|
|
|
|
|
|
boundary_tag tag;
|
|
|
|
pageid_t pid = ROOT_RECORD.page;
|
|
|
|
TregionReadBoundaryTag(xid, pid, &tag);
|
2010-02-23 23:54:44 +00:00
|
|
|
pageid_t max_off = 0;
|
2010-02-25 01:29:32 +00:00
|
|
|
bool done;
|
|
|
|
do {
|
|
|
|
max_off = pid + tag.size;
|
|
|
|
// print tag.
|
|
|
|
printf("\tPage %lld\tSize %lld\tAllocationManager %d\n", (long long)pid, (long long)tag.size, (int)tag.allocation_manager);
|
|
|
|
done = ! TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/);
|
|
|
|
} while(!done);
|
|
|
|
|
|
|
|
printf("\n");
|
|
|
|
|
2010-03-01 21:26:07 +00:00
|
|
|
printf("Tree components are using %lld megabytes. File is using %lld megabytes.\n",
|
2010-02-25 01:29:32 +00:00
|
|
|
PAGE_SIZE * (tree_c1_region_length * tree_c1_region_count
|
|
|
|
+ tree_c1_mergeable_region_length * tree_c1_mergeable_region_count
|
|
|
|
+ tree_c2_region_length * tree_c2_region_count
|
|
|
|
+ datapage_c1_region_length * datapage_c1_region_count
|
|
|
|
+ datapage_c1_mergeable_region_length * datapage_c1_mergeable_region_count
|
|
|
|
+ datapage_c2_region_length * datapage_c2_region_count) / (1024 * 1024),
|
|
|
|
(PAGE_SIZE * max_off) / (1024*1024));
|
|
|
|
|
|
|
|
free(datapage_c1_regions);
|
2010-02-23 23:54:44 +00:00
|
|
|
if(datapage_c1_mergeable_regions) free(datapage_c1_mergeable_regions);
|
2010-02-25 01:29:32 +00:00
|
|
|
free(datapage_c2_regions);
|
|
|
|
free(tree_c1_regions);
|
2010-02-23 23:54:44 +00:00
|
|
|
if(tree_c1_mergeable_regions) free(tree_c1_mergeable_regions);
|
2010-02-25 01:29:32 +00:00
|
|
|
free(tree_c2_regions);
|
|
|
|
return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS);
|
|
|
|
}
|
|
|
|
|
|
|
|
int op_dbg_drop_database(pthread_data * data) {
|
|
|
|
logtableIterator<datatuple> * itr = new logtableIterator<datatuple>(data->ltable);
|
|
|
|
datatuple * del;
|
|
|
|
fprintf(stderr, "DROPPING DATABASE...\n");
|
|
|
|
long long n = 0;
|
|
|
|
while((del = itr->getnext())) {
|
|
|
|
if(!del->isDelete()) {
|
|
|
|
del->setDelete();
|
|
|
|
data->ltable->insertTuple(del);
|
|
|
|
n++;
|
|
|
|
if(!(n % 1000)) {
|
|
|
|
printf("X %lld %s\n", n, (char*)del->key()); fflush(stdout);
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
n++;
|
|
|
|
if(!(n % 1000)) {
|
|
|
|
printf("? %lld %s\n", n, (char*)del->key()); fflush(stdout);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
datatuple::freetuple(del);
|
|
|
|
}
|
|
|
|
delete itr;
|
|
|
|
fprintf(stderr, "...DROP DATABASE COMPLETE\n");
|
|
|
|
return writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SUCCESS);
|
|
|
|
}
|
2010-02-23 23:54:44 +00:00
|
|
|
|
2010-02-25 01:29:32 +00:00
|
|
|
int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, pthread_data* data) {
|
|
|
|
int err = 0;
|
|
|
|
if(opcode == OP_INSERT)
|
|
|
|
{
|
|
|
|
err = op_insert(data, tuple);
|
|
|
|
}
|
|
|
|
else if(opcode == OP_FIND)
|
|
|
|
{
|
|
|
|
err = op_find(data, tuple);
|
|
|
|
}
|
|
|
|
else if(opcode == OP_SCAN)
|
|
|
|
{
|
|
|
|
size_t limit = readcountfromsocket(*(data->workitem), &err);
|
|
|
|
if(!err) { err = op_scan(data, tuple, tuple2, limit); }
|
|
|
|
}
|
2010-03-01 21:26:07 +00:00
|
|
|
else if(opcode == OP_FLUSH)
|
|
|
|
{
|
|
|
|
err = op_flush(data);
|
|
|
|
}
|
|
|
|
else if(opcode == OP_SHUTDOWN)
|
|
|
|
{
|
|
|
|
err = op_shutdown(data);
|
|
|
|
}
|
|
|
|
else if(opcode == OP_STAT_SPACE_USAGE)
|
|
|
|
{
|
|
|
|
err = op_stat_space_usage(data);
|
|
|
|
}
|
|
|
|
else if(opcode == OP_STAT_PERF_REPORT)
|
|
|
|
{
|
|
|
|
err = op_stat_perf_report(data);
|
|
|
|
}
|
|
|
|
else if(opcode == OP_STAT_HISTOGRAM)
|
|
|
|
{
|
|
|
|
size_t limit = readcountfromsocket(*(data->workitem), &err);
|
|
|
|
err = op_stat_histogram(data, limit);
|
|
|
|
}
|
|
|
|
else if(opcode == OP_DBG_BLOCKMAP)
|
2010-02-25 01:29:32 +00:00
|
|
|
{
|
|
|
|
err = op_dbg_blockmap(data);
|
|
|
|
}
|
|
|
|
else if(opcode == OP_DBG_DROP_DATABASE)
|
|
|
|
{
|
|
|
|
err = op_dbg_drop_database(data);
|
2010-02-23 23:54:44 +00:00
|
|
|
}
|
|
|
|
return err;
|
2010-02-16 23:15:29 +00:00
|
|
|
}
|
2010-01-23 02:13:59 +00:00
|
|
|
|
|
|
|
void * thread_work_fn( void * args)
|
|
|
|
{
|
|
|
|
pthread_item * item = (pthread_item *) args;
|
|
|
|
|
|
|
|
pthread_mutex_lock(item->data->th_mut);
|
|
|
|
while(true)
|
|
|
|
{
|
|
|
|
while(*(item->data->workitem) == -1)
|
|
|
|
{
|
|
|
|
if(!*(item->data->sys_alive))
|
|
|
|
break;
|
|
|
|
pthread_cond_wait(item->data->th_cond, item->data->th_mut); //wait for job
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
#ifdef STATS_ENABLED
|
|
|
|
gettimeofday(& (item->data->start_tv), 0);
|
|
|
|
std::ostringstream ostr;
|
|
|
|
ostr << *(item->data->workitem) << "_";
|
|
|
|
#endif
|
|
|
|
|
|
|
|
if(!*(item->data->sys_alive))
|
|
|
|
{
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
//step 1: read the opcode
|
2010-02-10 21:49:50 +00:00
|
|
|
network_op_t opcode = readopfromsocket(*(item->data->workitem), LOGSTORE_CLIENT_REQUEST);
|
|
|
|
if(opcode == LOGSTORE_CONN_CLOSED_ERROR) {
|
2010-02-02 23:50:16 +00:00
|
|
|
opcode = OP_DONE;
|
2010-03-01 21:26:07 +00:00
|
|
|
printf("Broken client closed connection uncleanly\n");
|
2010-02-02 22:50:09 +00:00
|
|
|
}
|
2010-01-23 02:13:59 +00:00
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
int err = opcode == OP_DONE || opiserror(opcode); //close the conn on failure
|
2010-01-23 02:13:59 +00:00
|
|
|
|
2010-02-20 01:18:39 +00:00
|
|
|
//step 2: read the first tuple from client
|
2010-02-23 23:54:44 +00:00
|
|
|
datatuple *tuple = 0, *tuple2 = 0;
|
2010-02-20 01:18:39 +00:00
|
|
|
if(!err) { tuple = readtuplefromsocket(*(item->data->workitem), &err); }
|
|
|
|
// read the second tuple from client
|
|
|
|
if(!err) { tuple2 = readtuplefromsocket(*(item->data->workitem), &err); }
|
2010-01-23 02:13:59 +00:00
|
|
|
|
2010-03-03 23:20:05 +00:00
|
|
|
//step 3: process the tuple
|
2010-02-23 23:54:44 +00:00
|
|
|
if(!err) { err = dispatch_request(opcode, tuple, tuple2, item->data); }
|
2010-01-23 02:13:59 +00:00
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
//free the tuple
|
|
|
|
if(tuple) datatuple::freetuple(tuple);
|
|
|
|
if(tuple2) datatuple::freetuple(tuple2);
|
2010-02-17 22:11:22 +00:00
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
pthread_mutex_lock(item->data->qlock);
|
2010-02-17 22:11:22 +00:00
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
// Deal with old work_queue item by freeing it or putting it back in the queue.
|
2010-02-17 22:11:22 +00:00
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
if(err) {
|
2010-03-01 21:26:07 +00:00
|
|
|
if(opcode != OP_DONE) {
|
|
|
|
char *msg;
|
|
|
|
if(-1 != asprintf(&msg, "network error. conn closed. (%d, %d) ",
|
|
|
|
*(item->data->workitem), item->data->work_queue->size())) {
|
|
|
|
perror(msg);
|
|
|
|
free(msg);
|
|
|
|
} else {
|
|
|
|
printf("error preparing string for perror!");
|
|
|
|
}
|
2010-02-23 23:54:44 +00:00
|
|
|
} else {
|
2010-03-01 21:26:07 +00:00
|
|
|
// printf("client done. conn closed. (%d, %d)\n",
|
|
|
|
// *(item->data->workitem), item->data->work_queue->size());
|
2010-02-23 23:54:44 +00:00
|
|
|
}
|
|
|
|
close(*(item->data->workitem));
|
2010-02-17 22:11:22 +00:00
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
} else {
|
2010-02-17 22:11:22 +00:00
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
//add conn desc to ready queue
|
|
|
|
item->data->ready_queue->push(*(item->data->workitem));
|
2010-01-23 02:13:59 +00:00
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
if(item->data->ready_queue->size() == 1) { //signal the event loop
|
|
|
|
pthread_cond_signal(item->data->selcond);
|
|
|
|
char gunk = 13;
|
|
|
|
int ret =write(item->data->self_pipe[1], &gunk, 1);
|
|
|
|
if(ret == -1) {
|
|
|
|
if(errno != EAGAIN) {
|
|
|
|
perror("Couldn't write to self_pipe!");
|
|
|
|
abort();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
assert(ret == 1);
|
|
|
|
}
|
|
|
|
}
|
2010-01-23 02:13:59 +00:00
|
|
|
}
|
|
|
|
|
2010-02-16 23:15:29 +00:00
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
if(item->data->work_queue->size() > 0)
|
|
|
|
{
|
|
|
|
int new_work = item->data->work_queue->front();
|
|
|
|
item->data->work_queue->pop();
|
|
|
|
*(item->data->workitem) = new_work;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
//set work to -1
|
|
|
|
*(item->data->workitem) = -1;
|
|
|
|
//add self to idle queue
|
|
|
|
item->data->idleth_queue->push(*item);
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(item->data->qlock);
|
2010-02-16 23:15:29 +00:00
|
|
|
|
2010-02-23 23:54:44 +00:00
|
|
|
if(!err) {
|
|
|
|
#ifdef STATS_ENABLED
|
|
|
|
if( item->data->num_reqs == 0 )
|
|
|
|
item->data->work_time = 0;
|
|
|
|
gettimeofday(& (item->data->stop_tv), 0);
|
|
|
|
(item->data->num_reqs)++;
|
|
|
|
item->data->work_time += (item->data->stop_tv.tv_sec - item->data->start_tv.tv_sec) * 1000 +
|
|
|
|
(item->data->stop_tv.tv_usec / 1000 - item->data->start_tv.tv_usec / 1000);
|
|
|
|
|
|
|
|
int iopcode = opcode;
|
|
|
|
ostr << iopcode;
|
|
|
|
std::string clientkey = ostr.str();
|
|
|
|
if(item->data->num_reqsc.find(clientkey) == item->data->num_reqsc.end())
|
2010-02-16 23:15:29 +00:00
|
|
|
{
|
2010-02-23 23:54:44 +00:00
|
|
|
item->data->num_reqsc[clientkey]=0;
|
|
|
|
item->data->work_timec[clientkey]=0;
|
2010-02-16 23:15:29 +00:00
|
|
|
}
|
2010-02-23 23:54:44 +00:00
|
|
|
|
|
|
|
item->data->num_reqsc[clientkey]++;
|
|
|
|
item->data->work_timec[clientkey] += (item->data->stop_tv.tv_sec - item->data->start_tv.tv_sec) * 1000 +
|
|
|
|
(item->data->stop_tv.tv_usec / 1000 - item->data->start_tv.tv_usec / 1000);;
|
|
|
|
#endif
|
|
|
|
|
2010-02-16 23:15:29 +00:00
|
|
|
}
|
2010-01-23 02:13:59 +00:00
|
|
|
}
|
|
|
|
pthread_mutex_unlock(item->data->th_mut);
|
|
|
|
|
2010-01-26 20:19:36 +00:00
|
|
|
return NULL;
|
2010-01-23 02:13:59 +00:00
|
|
|
}
|