diff --git a/logstore.cpp b/logstore.cpp index 250c062..4ceffa1 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -60,14 +60,10 @@ logtable::logtable(int log_mode, pageid_t max_c0_size, pageid_t internal_ this->datapage_size = datapage_size; this->log_mode = log_mode; - this->batch_size++; - if(log_mode > 0) { - log_file = stasis_log_file_pool_open("lsm_log", - stasis_log_file_mode, - stasis_log_file_permissions); - } else { - log_file = NULL; - } + this->batch_size = 0; + log_file = stasis_log_file_pool_open("lsm_log", + stasis_log_file_mode, + stasis_log_file_permissions); } template @@ -85,6 +81,8 @@ logtable::~logtable() memTreeComponent::tearDownTree(tree_c0); } + log_file->close(log_file); + pthread_mutex_destroy(&rb_mut); rwlc_deletelock(header_mut); pthread_cond_destroy(&c0_needed); @@ -98,9 +96,6 @@ template void logtable::init_stasis() { DataPage::register_stasis_page_impl(); - //stasis_buffer_manager_size = 768 * 1024; // 4GB = 2^10 pages: - // XXX Workaround Stasis' (still broken) default concurrent buffer manager -// stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; stasis_buffer_manager_hint_writes_are_sequential = 1; Tinit(); @@ -154,7 +149,6 @@ void logtable::logUpdate(datatuple * tup) { template void logtable::replayLog() { - if(!log_file) { assert(!log_mode); recovering = false; return; } lsn_t start = tbl_header.log_trunc; LogHandle * lh = start ? getLSNHandle(log_file, start) : getLogHandle(log_file); const LogEntry * e; @@ -184,8 +178,10 @@ void logtable::truncate_log() { if(recovering) { printf("Not truncating log until recovery is complete.\n"); } else { - printf("truncating log to %lld\n", tbl_header.log_trunc); - log_file->truncate(log_file, tbl_header.log_trunc); + if(tbl_header.log_trunc) { + printf("truncating log to %lld\n", tbl_header.log_trunc); + log_file->truncate(log_file, tbl_header.log_trunc); + } } } @@ -569,7 +565,7 @@ void logtable::insertManyTuples(datatuple ** tuples, int tuple_count) { for(int i = 0; i < tuple_count; i++) { merge_mgr->read_tuple_from_small_component(0, tuples[i]); } - if(log_file && !recovering) { + if(log_mode && !recovering) { for(int i = 0; i < tuple_count; i++) { logUpdate(tuples[i]); } @@ -598,7 +594,7 @@ void logtable::insertManyTuples(datatuple ** tuples, int tuple_count) { template void logtable::insertTuple(datatuple *tuple) { - if(log_file && !recovering) { + if(log_mode && !recovering) { logUpdate(tuple); batch_size++; if(batch_size >= log_mode) { diff --git a/network.h b/network.h index 0db6294..02d63f4 100644 --- a/network.h +++ b/network.h @@ -51,7 +51,8 @@ static const network_op_t OP_STAT_HISTOGRAM = 18; // Return N approximatel static const network_op_t OP_DBG_DROP_DATABASE = 19; static const network_op_t OP_DBG_BLOCKMAP = 20; static const network_op_t OP_DBG_NOOP = 21; -static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 21; +static const network_op_t OP_DBG_SET_LOG_MODE = 22; +static const network_op_t LOGSTORE_LAST_REQUEST_CODE = 22; //error codes static const network_op_t LOGSTORE_FIRST_ERROR = 27; diff --git a/newserver.cpp b/newserver.cpp index 778d9c6..c5dfb35 100644 --- a/newserver.cpp +++ b/newserver.cpp @@ -49,39 +49,39 @@ int main(int argc, char *argv[]) recordid table_root = ROOT_RECORD; + { + logtable ltable(log_mode, c0_size); + if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { + printf("Creating empty logstore\n"); + table_root = ltable.allocTable(xid); + assert(table_root.page == ROOT_RECORD.page && + table_root.slot == ROOT_RECORD.slot); + } else { + printf("Opened existing logstore\n"); + table_root.size = TrecordSize(xid, ROOT_RECORD); + ltable.openTable(xid, table_root); + } - logtable ltable(log_mode, c0_size); + Tcommit(xid); + merge_scheduler * mscheduler = new merge_scheduler(<able); + mscheduler->start(); + ltable.replayLog(); - if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { - printf("Creating empty logstore\n"); - table_root = ltable.allocTable(xid); - assert(table_root.page == ROOT_RECORD.page && - table_root.slot == ROOT_RECORD.slot); - } else { - printf("Opened existing logstore\n"); - table_root.size = TrecordSize(xid, ROOT_RECORD); - ltable.openTable(xid, table_root); + simpleServer *lserver = new simpleServer(<able); + + lserver->acceptLoop(); + + printf ("Stopping server...\n"); + delete lserver; + + printf("Stopping merge threads...\n"); + mscheduler->shutdown(); + delete mscheduler; + + printf("Deinitializing stasis...\n"); + fflush(stdout); } - - Tcommit(xid); - merge_scheduler * mscheduler = new merge_scheduler(<able); - mscheduler->start(); - ltable.replayLog(); - - simpleServer *lserver = new simpleServer(<able); - - lserver->acceptLoop(); - - printf ("Stopping server...\n"); - delete lserver; - - printf("Stopping merge threads...\n"); - mscheduler->shutdown(); - delete mscheduler; - - printf("Deinitializing stasis...\n"); - fflush(stdout); logtable::deinit_stasis(); printf("Shutdown complete\n"); diff --git a/requestDispatch.cpp b/requestDispatch.cpp index fdc9211..28c0284 100644 --- a/requestDispatch.cpp +++ b/requestDispatch.cpp @@ -421,7 +421,18 @@ template inline int requestDispatch::op_dbg_noop(logtable * ltable, HANDLE fd) { return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } - +template +inline int requestDispatch::op_dbg_set_log_mode(logtable * ltable, HANDLE fd, datatuple * tuple) { + if(tuple->keylen() != sizeof(int)) { + abort(); + return writeoptosocket(fd, LOGSTORE_PROTOCOL_ERROR); + } else { + int old_mode = ltable->log_mode; + ltable->log_mode = *(int*)tuple->key(); + fprintf(stderr, "\n\nChanged log mode from %d to %d\n\n", old_mode, ltable->log_mode); + return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); + } +} template int requestDispatch::dispatch_request(HANDLE f, logtable*ltable) { //step 1: read the opcode @@ -514,6 +525,9 @@ int requestDispatch::dispatch_request(network_op_t opcode, datatuple * t else if(opcode == OP_DBG_NOOP) { err = op_dbg_noop(ltable, fd); } + else if(opcode == OP_DBG_SET_LOG_MODE) { + err = op_dbg_set_log_mode(ltable, fd, tuple); + } return err; } diff --git a/requestDispatch.h b/requestDispatch.h index cbc37f0..d737b3f 100644 --- a/requestDispatch.h +++ b/requestDispatch.h @@ -26,6 +26,7 @@ private: static inline int op_dbg_blockmap(logtable * ltable, HANDLE fd); static inline int op_dbg_drop_database(logtable * ltable, HANDLE fd); static inline int op_dbg_noop(logtable * ltable, HANDLE fd); + static inline int op_dbg_set_log_mode(logtable * ltable, HANDLE fd, datatuple * tuple); public: static int dispatch_request(HANDLE f, logtable * ltable); diff --git a/simpleServer.cpp b/simpleServer.cpp index 2ea87d9..0875792 100644 --- a/simpleServer.cpp +++ b/simpleServer.cpp @@ -31,10 +31,13 @@ void * worker_wrap(void * arg) { } void * simpleServer::worker(int self) { + int mybufsize =128*1024; + char * bigbuffer = (char*)malloc(mybufsize); pthread_mutex_lock(&thread_mut[self]); while(true) { while(thread_fd[self] == -1) { if(!ltable->accepting_new_requests) { + free(bigbuffer); pthread_mutex_unlock(&thread_mut[self]); return 0; } @@ -42,12 +45,9 @@ void * simpleServer::worker(int self) { } pthread_mutex_unlock(&thread_mut[self]); FILE * f = fdopen(thread_fd[self], "a+"); - int mybufsize =128*1024; - char * bigbuffer = (char*)malloc(mybufsize); setbuffer(f, bigbuffer, mybufsize); while(!requestDispatch::dispatch_request(f, ltable)) { } fclose(f); - free(bigbuffer); pthread_mutex_lock(&thread_mut[self]); thread_fd[self] = -1; } diff --git a/tcpclient.cpp b/tcpclient.cpp index c0e30fd..a367fa8 100644 --- a/tcpclient.cpp +++ b/tcpclient.cpp @@ -78,7 +78,7 @@ logstore_client_op_returns_many(logstore_handle_t *l, if (l->server_socket < 0) { perror("ERROR opening socket.\n"); - return 0; + return LOGSTORE_CONN_CLOSED_ERROR; } #ifdef LOGSTORE_NODELAY @@ -92,14 +92,14 @@ logstore_client_op_returns_many(logstore_handle_t *l, if (result < 0) { perror("ERROR on setting socket option TCP_NODELAY.\n"); - return 0; + return LOGSTORE_CONN_CLOSED_ERROR; } #endif /* connect: create a connection with the server */ if (connect(l->server_socket, (sockaddr*) &(l->serveraddr), sizeof(l->serveraddr)) < 0) { perror("ERROR connecting\n"); - return 0; + return LOGSTORE_CONN_CLOSED_ERROR; } DEBUG("sock opened %d\n", l->server_socket); diff --git a/util/CMakeLists.txt b/util/CMakeLists.txt index f980374..4d5faec 100644 --- a/util/CMakeLists.txt +++ b/util/CMakeLists.txt @@ -1,3 +1,4 @@ +CREATE_CLIENT_EXECUTABLE(change_log_mode) CREATE_CLIENT_EXECUTABLE(copy_database) CREATE_CLIENT_EXECUTABLE(dump_blockmap) CREATE_CLIENT_EXECUTABLE(drop_database)