more allocator cleanups (still leaks space). Added dump_blockmap utility
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@578 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
e2ea48d233
commit
5032e77b6c
8 changed files with 148 additions and 46 deletions
|
@ -11,7 +11,7 @@ Project(Stasis)
|
|||
|
||||
SET(PACKAGE_VERSION 1)
|
||||
|
||||
SUBDIRS(test)
|
||||
SUBDIRS(test util)
|
||||
|
||||
# Main decisions
|
||||
SET(BUILD_SHARED_LIBS ON)
|
||||
|
|
13
datapage.h
13
datapage.h
|
@ -109,6 +109,19 @@ public:
|
|||
TregionDealloc(xid, pid);
|
||||
#endif
|
||||
}
|
||||
printf("Warning: leaking arraylist %lld in datapage\n", (long long)header_.region_list.page);
|
||||
// TarrayListDealloc(xid, header_.region_list);
|
||||
}
|
||||
pageid_t * list_regions(int xid, pageid_t * region_length, pageid_t * region_count) {
|
||||
*region_count = TarrayListLength(xid, header_.region_list);
|
||||
pageid_t * ret = (pageid_t*)malloc(sizeof(pageid_t) * *region_count);
|
||||
recordid rid = header_.region_list;
|
||||
for(pageid_t i = 0; i < *region_count; i++) {
|
||||
rid.slot = i;
|
||||
Tread(xid, rid, &ret[i]);
|
||||
}
|
||||
*region_length = header_.region_length;
|
||||
return ret;
|
||||
}
|
||||
void done() {
|
||||
nextPage_ = INVALID_PAGE;
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
|
||||
|
||||
|
||||
#include "logserver.h"
|
||||
#include "datatuple.h"
|
||||
#include "merger.h"
|
||||
|
||||
#include "logstore.h"
|
||||
|
||||
|
@ -16,6 +14,8 @@
|
|||
#include <sys/select.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include <stasis/operations/regions.h>
|
||||
|
||||
#undef begin
|
||||
#undef end
|
||||
#undef try
|
||||
|
@ -495,9 +495,65 @@ void * thread_work_fn( void * args)
|
|||
else if(opcode == OP_DBG_BLOCKMAP)
|
||||
{
|
||||
// produce a list of stasis regions
|
||||
int xid = Tbegin();
|
||||
|
||||
readlock(item->data->ltable->getMergeData()->header_lock, 0);
|
||||
|
||||
// produce a list of regions used by current tree components
|
||||
pageid_t datapage_c1_region_length, datapage_c2_region_length;
|
||||
pageid_t datapage_c1_region_count, datapage_c2_region_count;
|
||||
pageid_t * datapage_c1_regions = item->data->ltable->get_tree_c1()->get_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count);
|
||||
pageid_t * datapage_c2_regions = item->data->ltable->get_tree_c2()->get_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count);
|
||||
|
||||
pageid_t tree_c1_region_length, tree_c2_region_length;
|
||||
pageid_t tree_c1_region_count, tree_c2_region_count;
|
||||
|
||||
recordid tree_c1_region_header = item->data->ltable->get_treestate1();
|
||||
recordid tree_c2_region_header = item->data->ltable->get_treestate2();
|
||||
|
||||
pageid_t * tree_c1_regions = logtree::list_region_rid(xid, &tree_c1_region_header, &tree_c1_region_length, &tree_c1_region_count);
|
||||
pageid_t * tree_c2_regions = logtree::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count);
|
||||
unlock(item->data->ltable->getMergeData()->header_lock);
|
||||
|
||||
Tcommit(xid);
|
||||
|
||||
printf("C1 Datapage Regions (each is %lld pages long):\n", datapage_c1_region_length);
|
||||
for(pageid_t i = 0; i < datapage_c1_region_count; i++) {
|
||||
printf("%lld ", datapage_c1_regions[i]);
|
||||
}
|
||||
|
||||
printf("\nC1 Internal Node Regions (each is %lld pages long):\n", tree_c1_region_length);
|
||||
for(pageid_t i = 0; i < tree_c1_region_count; i++) {
|
||||
printf("%lld ", tree_c1_regions[i]);
|
||||
}
|
||||
|
||||
printf("\nC2 Datapage Regions (each is %lld pages long):\n", datapage_c2_region_length);
|
||||
for(pageid_t i = 0; i < datapage_c2_region_count; i++) {
|
||||
printf("%lld ", datapage_c2_regions[i]);
|
||||
}
|
||||
|
||||
printf("\nC2 Internal Node Regions (each is %lld pages long):\n", tree_c2_region_length);
|
||||
for(pageid_t i = 0; i < tree_c2_region_count; i++) {
|
||||
printf("%lld ", tree_c2_regions[i]);
|
||||
}
|
||||
printf("\nStasis Region Map\n");
|
||||
|
||||
boundary_tag tag;
|
||||
pageid_t pid = ROOT_RECORD.page;
|
||||
TregionReadBoundaryTag(xid, pid, &tag);
|
||||
bool done;
|
||||
do {
|
||||
// print tag.
|
||||
printf("\tPage %lld\tSize %lld\tAllocationManager %d\n", (long long)pid, (long long)tag.size, (int)tag.allocation_manager);
|
||||
done = ! TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/);
|
||||
} while(!done);
|
||||
|
||||
printf("\n");
|
||||
free(datapage_c1_regions);
|
||||
free(datapage_c2_regions);
|
||||
free(tree_c1_regions);
|
||||
free(tree_c2_regions);
|
||||
err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SUCCESS);
|
||||
|
||||
}
|
||||
|
||||
|
|
36
logstore.cpp
36
logstore.cpp
|
@ -38,13 +38,6 @@ const size_t logtree::root_rec_size = sizeof(int64_t);
|
|||
const int64_t logtree::PREV_LEAF = 0; //pointer to prev leaf page
|
||||
const int64_t logtree::NEXT_LEAF = 1; //pointer to next leaf page
|
||||
|
||||
|
||||
|
||||
logtree::logtree()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
void logtree::init_stasis() {
|
||||
|
||||
bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_PFILE;
|
||||
|
@ -82,6 +75,9 @@ void logtree::dealloc_region_rid(int xid, void *conf)
|
|||
Tread(xid,a.regionList,&pid);
|
||||
TregionDealloc(xid,pid);
|
||||
}
|
||||
a.regionList.slot = 0;
|
||||
printf("Warning: leaking arraylist %lld in logtree\n", (long long)a.regionList.page);
|
||||
// TarrayListDealloc(xid, a.regionList);
|
||||
}
|
||||
|
||||
|
||||
|
@ -152,6 +148,21 @@ pageid_t logtree::alloc_region_rid(int xid, void * ridp) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
pageid_t * logtree::list_region_rid(int xid, void *ridp, pageid_t * region_len, pageid_t * region_count) {
|
||||
recordid header = *(recordid*)ridp;
|
||||
RegionAllocConf_t conf;
|
||||
Tread(xid,header,&conf);
|
||||
recordid header_list = conf.regionList;
|
||||
*region_len = conf.regionSize;
|
||||
*region_count = conf.regionCount;
|
||||
pageid_t * ret = (pageid_t*) malloc(sizeof(pageid_t) * *region_count);
|
||||
for(pageid_t i = 0; i < *region_count; i++) {
|
||||
header_list.slot = i;
|
||||
Tread(xid,header_list,&ret[i]);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
||||
recordid logtree::create(int xid)
|
||||
|
@ -876,17 +887,14 @@ recordid logtable::allocTable(int xid)
|
|||
table_rec = Talloc(xid, sizeof(tbl_header));
|
||||
|
||||
//create the big tree
|
||||
tree_c2 = new logtree();
|
||||
tbl_header.c2_dp_state = Talloc(xid, DataPage<datatuple>::RegionAllocator::header_size);
|
||||
tree_c2 = new logtree(new DataPage<datatuple>::RegionAllocator(xid, tbl_header.c2_dp_state, 10000)); /// XXX do not hard code region length.
|
||||
tree_c2->create(xid);
|
||||
|
||||
tbl_header.c2_dp_state = Talloc(xid, DataPage<datatuple>::RegionAllocator::header_size);
|
||||
tree_c2->set_alloc(new DataPage<datatuple>::RegionAllocator(xid, tbl_header.c2_dp_state, 10000)); /// XXX do not hard code region length.
|
||||
|
||||
//create the small tree
|
||||
tree_c1 = new logtree();
|
||||
tree_c1->create(xid);
|
||||
tbl_header.c1_dp_state = Talloc(xid, DataPage<datatuple>::RegionAllocator::header_size);
|
||||
tree_c1->set_alloc(new DataPage<datatuple>::RegionAllocator(xid, tbl_header.c1_dp_state, 10000)); /// XXX do not hard code region length.
|
||||
tree_c1 = new logtree(new DataPage<datatuple>::RegionAllocator(xid, tbl_header.c1_dp_state, 10000)); /// XXX do not hard code region length.
|
||||
tree_c1->create(xid);
|
||||
|
||||
tbl_header.c2_root = tree_c2->get_root_rec();
|
||||
tbl_header.c2_state = tree_c2->get_tree_state();
|
||||
|
|
|
@ -59,7 +59,7 @@ typedef void(*logtree_page_deallocator_t)(int, void *);
|
|||
|
||||
class logtree{
|
||||
public:
|
||||
logtree();
|
||||
logtree(DataPage<datatuple>::RegionAllocator * alloc): region_alloc(alloc) {}
|
||||
|
||||
recordid create(int xid);
|
||||
|
||||
|
@ -72,6 +72,7 @@ private:
|
|||
public:
|
||||
static pageid_t alloc_region_rid(int xid, void * ridp);
|
||||
static void force_region_rid(int xid, void *conf);
|
||||
static pageid_t*list_region_rid(int xid, void * ridp, pageid_t * region_len, pageid_t * region_count);
|
||||
static void dealloc_region_rid(int xid, void *conf);
|
||||
static void free_region_rid(int xid, recordid tree,
|
||||
logtree_page_deallocator_t dealloc,
|
||||
|
@ -128,7 +129,7 @@ public:
|
|||
void *allocator_state);
|
||||
|
||||
inline DataPage<datatuple>::RegionAllocator* get_alloc() { return region_alloc; }
|
||||
inline void set_alloc(DataPage<datatuple>::RegionAllocator* a1) { region_alloc = a1; } // XXX kludge; must be a better api for this
|
||||
// inline void set_alloc(DataPage<datatuple>::RegionAllocator* a1) { region_alloc = a1; } // XXX kludge; must be a better api for this
|
||||
// (currently, need to get rid from dpstate. add a 'register' method that sets the rid of the region allocator?)
|
||||
|
||||
/**
|
||||
|
@ -202,6 +203,8 @@ public:
|
|||
|
||||
inline recordid get_dpstate1(){return tbl_header.c1_dp_state;}
|
||||
inline recordid get_dpstate2(){return tbl_header.c2_dp_state;}
|
||||
inline recordid get_treestate1(){return tbl_header.c1_state;}
|
||||
inline recordid get_treestate2(){return tbl_header.c2_state;}
|
||||
|
||||
int get_fixed_page_count(){return fixed_page_count;}
|
||||
void set_fixed_page_count(int count){fixed_page_count = count;}
|
||||
|
|
31
merger.cpp
31
merger.cpp
|
@ -234,7 +234,6 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
|
|||
|
||||
}
|
||||
|
||||
//TODO: flush the data pages
|
||||
// deallocate/free their region
|
||||
// create new data region for new data pages
|
||||
void* memMergeThread(void*arg)
|
||||
|
@ -310,14 +309,12 @@ void* memMergeThread(void*arg)
|
|||
xid = Tbegin();
|
||||
|
||||
//create a new tree
|
||||
logtree * scratch_tree = new logtree;
|
||||
logtree * scratch_tree = new logtree(new DataPage<datatuple>::RegionAllocator(xid, ltable->get_dpstate1() /*rid of old header*/, 10000)); // XXX should not hardcode region size)
|
||||
recordid scratch_root = scratch_tree->create(xid);
|
||||
|
||||
//save the old dp state values
|
||||
DataPage<datatuple>::RegionAllocator *old_alloc = ltable->get_tree_c1()->get_alloc();
|
||||
old_alloc->done(); // XXX do this earlier
|
||||
//reinitialize the dp state
|
||||
scratch_tree->set_alloc(new DataPage<datatuple>::RegionAllocator(xid, ltable->get_dpstate1() /*rid of old header*/, 10000)); // XXX should not hardcode region size
|
||||
|
||||
//pthread_mutex_unlock(a->block_ready_mut);
|
||||
unlock(ltable->mergedata->header_lock);
|
||||
|
@ -333,7 +330,6 @@ void* memMergeThread(void*arg)
|
|||
|
||||
//force write the new region to disk
|
||||
recordid scratch_alloc_state = scratch_tree->get_tree_state();
|
||||
// XXX When called by merger_check (at least), we hold a pin on a page that should be forced. This causes stasis to abort() the process.
|
||||
logtree::force_region_rid(xid, &scratch_alloc_state);
|
||||
//force write the new datapages
|
||||
scratch_tree->get_alloc()->force_regions(xid);
|
||||
|
@ -399,12 +395,11 @@ void* memMergeThread(void*arg)
|
|||
|
||||
*a->out_tree = scratch_tree;
|
||||
xid = Tbegin();
|
||||
// Tread(xid, ltable->get_dpstate1(), a->out_tree_allocer);
|
||||
|
||||
pthread_cond_signal(a->out_block_ready_cond);
|
||||
|
||||
|
||||
logtree *empty_tree = new logtree;
|
||||
logtree *empty_tree = new logtree(new DataPage<datatuple>::RegionAllocator(xid, ltable->get_dpstate1() /*rid of old header*/, 10000)); // XXX should not hardcode region size);
|
||||
empty_tree->create(xid);
|
||||
|
||||
*(recordid*)(a->pageAllocState) = empty_tree->get_tree_state();
|
||||
|
@ -419,9 +414,6 @@ void* memMergeThread(void*arg)
|
|||
h.c1_state = empty_tree->get_tree_state(); //update index alloc state
|
||||
printf("mmt:\tUpdated C1's position on disk to %lld\n",empty_tree->get_root_rec().page);
|
||||
Tset(xid, a->tree, &h);
|
||||
//update datapage alloc state
|
||||
ltable->get_tree_c1()->set_alloc(new DataPage<datatuple>::RegionAllocator(xid, ltable->get_dpstate1() /*rid of old header*/, 10000)); // XXX should not hardcode region size
|
||||
|
||||
Tcommit(xid);
|
||||
//xid = Tbegin();
|
||||
|
||||
|
@ -510,27 +502,17 @@ void *diskMergeThread(void*arg)
|
|||
treeIterator<datatuple> *itrB =
|
||||
new treeIterator<datatuple>((*a->in_tree)->get_root_rec());
|
||||
|
||||
//Tcommit(xid);
|
||||
xid = Tbegin();
|
||||
|
||||
//create a new tree
|
||||
logtree * scratch_tree = new logtree;
|
||||
//TODO: maybe you want larger regions for the second tree?
|
||||
logtree * scratch_tree = new logtree(new DataPage<datatuple>::RegionAllocator(xid, ltable->get_dpstate2() /*rid of old header*/, 10000)); // XXX should not hardcode region size
|
||||
recordid scratch_root = scratch_tree->create(xid);
|
||||
|
||||
//save the old dp state values
|
||||
DataPage<datatuple>::RegionAllocator *old_alloc1 = ltable->get_tree_c1()->get_alloc();
|
||||
DataPage<datatuple>::RegionAllocator *old_alloc2 = ltable->get_tree_c2()->get_alloc();
|
||||
|
||||
//reinitialize the dp state
|
||||
//TODO: maybe you want larger regions for the second tree?
|
||||
// foo XXX Tset(xid, ltable->get_dpstate2(), &logtable::DATAPAGE_REGION_ALLOC_STATIC_INITIALIZER);
|
||||
|
||||
//DataPage<datatuple>::RegionAllocator *newAlloc2 = new DataPage<datatuple>::RegionAllocator(xid, ltable->get_dpstate2(), 10000); // XXX don't hardcode region length
|
||||
|
||||
scratch_tree->set_alloc(new DataPage<datatuple>::RegionAllocator(xid, ltable->get_dpstate2() /*rid of old header*/, 10000)); // XXX should not hardcode region size
|
||||
|
||||
|
||||
//pthread_mutex_unlock(a->block_ready_mut);
|
||||
unlock(ltable->mergedata->header_lock);
|
||||
|
||||
|
||||
|
@ -545,15 +527,12 @@ void *diskMergeThread(void*arg)
|
|||
|
||||
//force write the new region to disk
|
||||
recordid scratch_alloc_state = scratch_tree->get_tree_state();
|
||||
//TODO:
|
||||
//TlsmForce(xid,scratch_root,logtree::force_region_rid, &scratch_alloc_state);
|
||||
logtree::force_region_rid(xid, &scratch_alloc_state);
|
||||
//force write the new datapages
|
||||
scratch_tree->get_alloc()->force_regions(xid);
|
||||
|
||||
|
||||
//writes complete
|
||||
//now automically replace the old c2 with new c2
|
||||
//now atomically replace the old c2 with new c2
|
||||
//pthread_mutex_lock(a->block_ready_mut);
|
||||
writelock(ltable->mergedata->header_lock,0);
|
||||
|
||||
|
|
1
util/CMakeLists.txt
Normal file
1
util/CMakeLists.txt
Normal file
|
@ -0,0 +1 @@
|
|||
CREATE_CLIENT_EXECUTABLE(dump_blockmap)
|
42
util/dump_blockmap.cpp
Normal file
42
util/dump_blockmap.cpp
Normal file
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* dump_blockmap.cc
|
||||
*
|
||||
* Created on: Feb 16, 2010
|
||||
* Author: sears
|
||||
*/
|
||||
|
||||
#include "../tcpclient.h"
|
||||
#include "../network.h"
|
||||
#include "../datatuple.h"
|
||||
|
||||
void usage(char * argv[]) {
|
||||
fprintf(stderr, "usage %s [host [port]]\n", argv[0]);
|
||||
}
|
||||
|
||||
int main(int argc, char * argv[]) {
|
||||
bool ok = true;
|
||||
int svrport = 32432;
|
||||
char * svrname = "localhost";
|
||||
if(argc == 3) {
|
||||
svrport = atoi(argv[2]);
|
||||
}
|
||||
if(argc == 2 || argc == 3) {
|
||||
svrname = argv[1];
|
||||
}
|
||||
if(!ok || argc > 3) {
|
||||
usage(argv); return 1;
|
||||
}
|
||||
|
||||
logstore_handle_t * l = logstore_client_open(svrname, svrport, 100);
|
||||
|
||||
if(l == NULL) { perror("Couldn't open connection"); return 2; }
|
||||
|
||||
datatuple * t = datatuple::create("", 1);
|
||||
datatuple * ret = logstore_client_op(l, OP_DBG_BLOCKMAP, t);
|
||||
if(ret == NULL) {
|
||||
perror("Blockmap request failed."); return 3;
|
||||
}
|
||||
logstore_client_close(l);
|
||||
printf("Success\n");
|
||||
return 0;
|
||||
}
|
Loading…
Reference in a new issue