fix shutdown bug; add debugging utility that allows the log to be enabled / batched / disabled at runtime
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2437 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
8135bbcc2e
commit
c42a275ab4
8 changed files with 66 additions and 53 deletions
18
logstore.cpp
18
logstore.cpp
|
@ -60,14 +60,10 @@ logtable<TUPLE>::logtable(int log_mode, pageid_t max_c0_size, pageid_t internal_
|
|||
this->datapage_size = datapage_size;
|
||||
|
||||
this->log_mode = log_mode;
|
||||
this->batch_size++;
|
||||
if(log_mode > 0) {
|
||||
this->batch_size = 0;
|
||||
log_file = stasis_log_file_pool_open("lsm_log",
|
||||
stasis_log_file_mode,
|
||||
stasis_log_file_permissions);
|
||||
} else {
|
||||
log_file = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
template<class TUPLE>
|
||||
|
@ -85,6 +81,8 @@ logtable<TUPLE>::~logtable()
|
|||
memTreeComponent<datatuple>::tearDownTree(tree_c0);
|
||||
}
|
||||
|
||||
log_file->close(log_file);
|
||||
|
||||
pthread_mutex_destroy(&rb_mut);
|
||||
rwlc_deletelock(header_mut);
|
||||
pthread_cond_destroy(&c0_needed);
|
||||
|
@ -98,9 +96,6 @@ template<class TUPLE>
|
|||
void logtable<TUPLE>::init_stasis() {
|
||||
|
||||
DataPage<datatuple>::register_stasis_page_impl();
|
||||
//stasis_buffer_manager_size = 768 * 1024; // 4GB = 2^10 pages:
|
||||
// XXX Workaround Stasis' (still broken) default concurrent buffer manager
|
||||
// stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory;
|
||||
stasis_buffer_manager_hint_writes_are_sequential = 1;
|
||||
Tinit();
|
||||
|
||||
|
@ -154,7 +149,6 @@ void logtable<TUPLE>::logUpdate(datatuple * tup) {
|
|||
|
||||
template<class TUPLE>
|
||||
void logtable<TUPLE>::replayLog() {
|
||||
if(!log_file) { assert(!log_mode); recovering = false; return; }
|
||||
lsn_t start = tbl_header.log_trunc;
|
||||
LogHandle * lh = start ? getLSNHandle(log_file, start) : getLogHandle(log_file);
|
||||
const LogEntry * e;
|
||||
|
@ -184,10 +178,12 @@ void logtable<TUPLE>::truncate_log() {
|
|||
if(recovering) {
|
||||
printf("Not truncating log until recovery is complete.\n");
|
||||
} else {
|
||||
if(tbl_header.log_trunc) {
|
||||
printf("truncating log to %lld\n", tbl_header.log_trunc);
|
||||
log_file->truncate(log_file, tbl_header.log_trunc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template<class TUPLE>
|
||||
void logtable<TUPLE>::update_persistent_header(int xid, lsn_t trunc_lsn) {
|
||||
|
@ -569,7 +565,7 @@ void logtable<TUPLE>::insertManyTuples(datatuple ** tuples, int tuple_count) {
|
|||
for(int i = 0; i < tuple_count; i++) {
|
||||
merge_mgr->read_tuple_from_small_component(0, tuples[i]);
|
||||
}
|
||||
if(log_file && !recovering) {
|
||||
if(log_mode && !recovering) {
|
||||
for(int i = 0; i < tuple_count; i++) {
|
||||
logUpdate(tuples[i]);
|
||||
}
|
||||
|
@ -598,7 +594,7 @@ void logtable<TUPLE>::insertManyTuples(datatuple ** tuples, int tuple_count) {
|
|||
template<class TUPLE>
|
||||
void logtable<TUPLE>::insertTuple(datatuple *tuple)
|
||||
{
|
||||
if(log_file && !recovering) {
|
||||
if(log_mode && !recovering) {
|
||||
logUpdate(tuple);
|
||||
batch_size++;
|
||||
if(batch_size >= log_mode) {
|
||||
|
|
|
@ -51,7 +51,8 @@ static const network_op_t OP_STAT_HISTOGRAM = 18; // Return N approximatel
|
|||
static const network_op_t OP_DBG_DROP_DATABASE = 19;
|
||||
static const network_op_t OP_DBG_BLOCKMAP = 20;
|
||||
static const network_op_t OP_DBG_NOOP = 21;
|
||||
static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 21;
|
||||
static const network_op_t OP_DBG_SET_LOG_MODE = 22;
|
||||
static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 22;
|
||||
|
||||
//error codes
|
||||
static const network_op_t LOGSTORE_FIRST_ERROR = 27;
|
||||
|
|
|
@ -49,8 +49,7 @@ int main(int argc, char *argv[])
|
|||
|
||||
|
||||
recordid table_root = ROOT_RECORD;
|
||||
|
||||
|
||||
{
|
||||
logtable<datatuple> ltable(log_mode, c0_size);
|
||||
|
||||
if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) {
|
||||
|
@ -82,6 +81,7 @@ int main(int argc, char *argv[])
|
|||
|
||||
printf("Deinitializing stasis...\n");
|
||||
fflush(stdout);
|
||||
}
|
||||
logtable<datatuple>::deinit_stasis();
|
||||
|
||||
printf("Shutdown complete\n");
|
||||
|
|
|
@ -421,7 +421,18 @@ template<class HANDLE>
|
|||
inline int requestDispatch<HANDLE>::op_dbg_noop(logtable<datatuple> * ltable, HANDLE fd) {
|
||||
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
|
||||
}
|
||||
|
||||
template<class HANDLE>
|
||||
inline int requestDispatch<HANDLE>::op_dbg_set_log_mode(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple) {
|
||||
if(tuple->keylen() != sizeof(int)) {
|
||||
abort();
|
||||
return writeoptosocket(fd, LOGSTORE_PROTOCOL_ERROR);
|
||||
} else {
|
||||
int old_mode = ltable->log_mode;
|
||||
ltable->log_mode = *(int*)tuple->key();
|
||||
fprintf(stderr, "\n\nChanged log mode from %d to %d\n\n", old_mode, ltable->log_mode);
|
||||
return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS);
|
||||
}
|
||||
}
|
||||
template<class HANDLE>
|
||||
int requestDispatch<HANDLE>::dispatch_request(HANDLE f, logtable<datatuple>*ltable) {
|
||||
//step 1: read the opcode
|
||||
|
@ -514,6 +525,9 @@ int requestDispatch<HANDLE>::dispatch_request(network_op_t opcode, datatuple * t
|
|||
else if(opcode == OP_DBG_NOOP) {
|
||||
err = op_dbg_noop(ltable, fd);
|
||||
}
|
||||
else if(opcode == OP_DBG_SET_LOG_MODE) {
|
||||
err = op_dbg_set_log_mode(ltable, fd, tuple);
|
||||
}
|
||||
return err;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ private:
|
|||
static inline int op_dbg_blockmap(logtable<datatuple> * ltable, HANDLE fd);
|
||||
static inline int op_dbg_drop_database(logtable<datatuple> * ltable, HANDLE fd);
|
||||
static inline int op_dbg_noop(logtable<datatuple> * ltable, HANDLE fd);
|
||||
static inline int op_dbg_set_log_mode(logtable<datatuple> * ltable, HANDLE fd, datatuple * tuple);
|
||||
|
||||
public:
|
||||
static int dispatch_request(HANDLE f, logtable<datatuple> * ltable);
|
||||
|
|
|
@ -31,10 +31,13 @@ void * worker_wrap(void * arg) {
|
|||
}
|
||||
|
||||
void * simpleServer::worker(int self) {
|
||||
int mybufsize =128*1024;
|
||||
char * bigbuffer = (char*)malloc(mybufsize);
|
||||
pthread_mutex_lock(&thread_mut[self]);
|
||||
while(true) {
|
||||
while(thread_fd[self] == -1) {
|
||||
if(!ltable->accepting_new_requests) {
|
||||
free(bigbuffer);
|
||||
pthread_mutex_unlock(&thread_mut[self]);
|
||||
return 0;
|
||||
}
|
||||
|
@ -42,12 +45,9 @@ void * simpleServer::worker(int self) {
|
|||
}
|
||||
pthread_mutex_unlock(&thread_mut[self]);
|
||||
FILE * f = fdopen(thread_fd[self], "a+");
|
||||
int mybufsize =128*1024;
|
||||
char * bigbuffer = (char*)malloc(mybufsize);
|
||||
setbuffer(f, bigbuffer, mybufsize);
|
||||
while(!requestDispatch<FILE*>::dispatch_request(f, ltable)) { }
|
||||
fclose(f);
|
||||
free(bigbuffer);
|
||||
pthread_mutex_lock(&thread_mut[self]);
|
||||
thread_fd[self] = -1;
|
||||
}
|
||||
|
|
|
@ -78,7 +78,7 @@ logstore_client_op_returns_many(logstore_handle_t *l,
|
|||
if (l->server_socket < 0)
|
||||
{
|
||||
perror("ERROR opening socket.\n");
|
||||
return 0;
|
||||
return LOGSTORE_CONN_CLOSED_ERROR;
|
||||
}
|
||||
|
||||
#ifdef LOGSTORE_NODELAY
|
||||
|
@ -92,14 +92,14 @@ logstore_client_op_returns_many(logstore_handle_t *l,
|
|||
if (result < 0)
|
||||
{
|
||||
perror("ERROR on setting socket option TCP_NODELAY.\n");
|
||||
return 0;
|
||||
return LOGSTORE_CONN_CLOSED_ERROR;
|
||||
}
|
||||
#endif
|
||||
/* connect: create a connection with the server */
|
||||
if (connect(l->server_socket, (sockaddr*) &(l->serveraddr), sizeof(l->serveraddr)) < 0)
|
||||
{
|
||||
perror("ERROR connecting\n");
|
||||
return 0;
|
||||
return LOGSTORE_CONN_CLOSED_ERROR;
|
||||
}
|
||||
|
||||
DEBUG("sock opened %d\n", l->server_socket);
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
CREATE_CLIENT_EXECUTABLE(change_log_mode)
|
||||
CREATE_CLIENT_EXECUTABLE(copy_database)
|
||||
CREATE_CLIENT_EXECUTABLE(dump_blockmap)
|
||||
CREATE_CLIENT_EXECUTABLE(drop_database)
|
||||
|
|
Loading…
Reference in a new issue