renamed some functions to match naming convention

This commit is contained in:
Sears Russell 2009-05-08 06:53:30 +00:00
parent 651a1a22e5
commit bc554efc85
17 changed files with 274 additions and 282 deletions

View file

@ -81,11 +81,11 @@ static int cmpFreespace(const void * ap, const void * bp, const void * param) {
}
}
inline static availablePage* getAvailablePage(allocationPolicy * ap, pageid_t pageid) {
inline static availablePage* getAvailablePage(stasis_allocation_policy_t * ap, pageid_t pageid) {
return (availablePage*) LH_ENTRY(find)(ap->allPages, &pageid, sizeof(pageid));
}
inline static void insert_xidAlloced(allocationPolicy * ap, int xid, availablePage * p) {
inline static void insert_xidAlloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
struct RB_ENTRY(tree) * pages = LH_ENTRY(find)(ap->xidAlloced, &xid, sizeof(xid));
if(!pages) {
@ -96,7 +96,7 @@ inline static void insert_xidAlloced(allocationPolicy * ap, int xid, availablePa
assert(check == p);
}
inline static void remove_xidAlloced(allocationPolicy * ap, int xid, availablePage * p) {
inline static void remove_xidAlloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
assert(p->lockCount);
struct RB_ENTRY(tree) * pages = LH_ENTRY(find)(ap->xidAlloced, &xid, sizeof(xid));
assert(pages);
@ -104,7 +104,7 @@ inline static void remove_xidAlloced(allocationPolicy * ap, int xid, availablePa
assert(check == p); // sometimes fails
}
inline static void insert_xidDealloced(allocationPolicy * ap, int xid, availablePage * p) {
inline static void insert_xidDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
struct RB_ENTRY(tree) * pages = LH_ENTRY(find)(ap->xidDealloced, &xid, sizeof(xid));
if(!pages) {
@ -117,7 +117,7 @@ inline static void insert_xidDealloced(allocationPolicy * ap, int xid, available
assert(check);
}
inline static void remove_xidDealloced(allocationPolicy * ap, int xid, availablePage * p) {
inline static void remove_xidDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
struct RB_ENTRY(tree) * pages = LH_ENTRY(find)(ap->xidDealloced, &xid, sizeof(xid));
assert(pages);
@ -125,7 +125,7 @@ inline static void remove_xidDealloced(allocationPolicy * ap, int xid, available
assert(check == p);
}
inline static int find_xidDealloced(allocationPolicy * ap, int xid, availablePage * p) {
inline static int find_xidDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
struct RB_ENTRY(tree) * pages = LH_ENTRY(find)(ap->xidDealloced, &xid, sizeof(xid));
if(!pages) { return 0; }
const availablePage * check = RB_ENTRY(find)(p, pages);
@ -136,7 +136,7 @@ inline static int find_xidDealloced(allocationPolicy * ap, int xid, availablePag
}
}
inline static void lockAlloced(allocationPolicy * ap, int xid, availablePage * p) {
inline static void lockAlloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
const availablePage * check = RB_ENTRY(delete)(p, ap->availablePages);
assert(check == p);
@ -149,7 +149,7 @@ inline static void lockAlloced(allocationPolicy * ap, int xid, availablePage
insert_xidAlloced(ap, xid, p);
}
inline static void unlockAlloced(allocationPolicy * ap, int xid, availablePage * p) {
inline static void unlockAlloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
remove_xidAlloced(ap, xid, p);
p->lockCount--;
@ -165,7 +165,7 @@ inline static void unlockAlloced(allocationPolicy * ap, int xid, availablePage
}
inline static void lockDealloced(allocationPolicy * ap, int xid, availablePage * p) {
inline static void lockDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
if(p->lockCount == 0) {
// xid should own it
lockAlloced(ap, xid, p);
@ -205,7 +205,7 @@ inline static void lockDealloced(allocationPolicy * ap, int xid, availablePage *
}
}
inline static void unlockDealloced(allocationPolicy * ap, int xid, availablePage * p) {
inline static void unlockDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
assert(p->lockCount > 0);
p->lockCount--;
@ -219,8 +219,8 @@ inline static void unlockDealloced(allocationPolicy * ap, int xid, availablePage
}
}
allocationPolicy * allocationPolicyInit() {
allocationPolicy * ap = malloc(sizeof(allocationPolicy));
stasis_allocation_policy_t * stasis_allocation_policy_init() {
stasis_allocation_policy_t * ap = malloc(sizeof(stasis_allocation_policy_t));
ap->xidAlloced = LH_ENTRY(create)(10);
ap->xidDealloced = LH_ENTRY(create)(10);
@ -230,7 +230,7 @@ allocationPolicy * allocationPolicyInit() {
return ap;
}
void allocationPolicyDeinit(allocationPolicy * ap) {
void stasis_allocation_policy_deinit(stasis_allocation_policy_t * ap) {
const availablePage * next;
while(( next = RB_ENTRY(min)(ap->availablePages) )) {
@ -246,7 +246,7 @@ void allocationPolicyDeinit(allocationPolicy * ap) {
free(ap);
}
void allocationPolicyAddPages(allocationPolicy * ap, availablePage** newPages) {
void stasis_allocation_policy_register_new_pages(stasis_allocation_policy_t * ap, availablePage** newPages) {
for(int i = 0; newPages[i] != 0; i++) {
const availablePage * ret = RB_ENTRY(search)(newPages[i], ap->availablePages);
@ -260,7 +260,7 @@ void allocationPolicyAddPages(allocationPolicy * ap, availablePage** newPages) {
/// XXX need updateAlloced, updateFree, which remove a page, change
/// its freespace / lockCount, and then re-insert them into the tree.
availablePage * allocationPolicyFindPage(allocationPolicy * ap, int xid, int freespace) {
availablePage * stasis_allocation_policy_pick_suitable_page(stasis_allocation_policy_t * ap, int xid, int freespace) {
// For the best fit amongst the pages in availablePages, call:
//
// rblookup(RB_LUGREAT, key, availablePages)
@ -302,7 +302,7 @@ availablePage * allocationPolicyFindPage(allocationPolicy * ap, int xid, int fre
return (availablePage*) ret;
}
int allocationPolicyCanXidAllocFromPage(allocationPolicy *ap, int xid, pageid_t page) {
int stasis_allocation_policy_can_xid_alloc_from_page(stasis_allocation_policy_t *ap, int xid, pageid_t page) {
availablePage * p = getAvailablePage(ap, page);
const availablePage * check1 = RB_ENTRY(find)(p, ap->availablePages);
int * xidp = LH_ENTRY(find)(ap->pageOwners, &(page), sizeof(page));
@ -319,7 +319,7 @@ int allocationPolicyCanXidAllocFromPage(allocationPolicy *ap, int xid, pageid_t
}
}
void allocationPolicyAllocedFromPage(allocationPolicy *ap, int xid, pageid_t page) {
void stasis_allocation_policy_alloced_from_page(stasis_allocation_policy_t *ap, int xid, pageid_t page) {
availablePage * p = getAvailablePage(ap, page);
const availablePage * check1 = RB_ENTRY(find)(p, ap->availablePages);
int * xidp = LH_ENTRY(find)(ap->pageOwners, &(page), sizeof(page));
@ -343,7 +343,7 @@ void allocationPolicyAllocedFromPage(allocationPolicy *ap, int xid, pageid_t pag
}
}
void allocationPolicyLockPage(allocationPolicy *ap, int xid, pageid_t page) {
void stasis_allocation_policy_lock_page(stasis_allocation_policy_t *ap, int xid, pageid_t page) {
availablePage * p = getAvailablePage(ap, page);
lockDealloced(ap, xid, p);
@ -351,7 +351,7 @@ void allocationPolicyLockPage(allocationPolicy *ap, int xid, pageid_t page) {
}
void allocationPolicyTransactionCompleted(allocationPolicy * ap, int xid) {
void stasis_allocation_policy_transaction_completed(stasis_allocation_policy_t * ap, int xid) {
struct RB_ENTRY(tree) * locks = LH_ENTRY(find)(ap->xidAlloced, &xid, sizeof(xid));
@ -384,7 +384,7 @@ void allocationPolicyTransactionCompleted(allocationPolicy * ap, int xid) {
}
void allocationPolicyUpdateFreespaceUnlockedPage(allocationPolicy * ap, availablePage * key, int newFree) {
void stasis_allocation_policy_update_freespace_unlocked_page(stasis_allocation_policy_t * ap, availablePage * key, int newFree) {
availablePage * p = (availablePage*) RB_ENTRY(delete)(key, ap->availablePages);
assert(key == p);
p->freespace = newFree;
@ -392,7 +392,7 @@ void allocationPolicyUpdateFreespaceUnlockedPage(allocationPolicy * ap, availabl
assert(ret == p);
}
void allocationPolicyUpdateFreespaceLockedPage(allocationPolicy * ap, int xid, availablePage * key, int newFree) {
void stasis_allocation_policy_update_freespace_locked_page(stasis_allocation_policy_t * ap, int xid, availablePage * key, int newFree) {
struct RB_ENTRY(tree) * locks = LH_ENTRY(find)(ap->xidAlloced, &xid, sizeof(xid));
assert(key);
availablePage * p = (availablePage*) RB_ENTRY(delete)(key, locks);

View file

@ -4,7 +4,7 @@
#include <assert.h>
void allocBlob(int xid, recordid rid) {
void stasis_blob_alloc(int xid, recordid rid) {
assert(rid.size>0);
pageid_t pageCount = (rid.size / USABLE_SIZE_OF_PAGE) + ((rid.size % USABLE_SIZE_OF_PAGE) ? 1 : 0);
long startPage = TpageAllocMany(xid, pageCount);
@ -17,38 +17,30 @@ void allocBlob(int xid, recordid rid) {
DEBUG("rid = {%d %d %d}\n", rid.page, rid.slot, rid.size);
}
void deallocBlob(int xid, blob_record_t *r) {
/* Page *p = loadPage(xid, rid.page);
writelock(p->rwlatch,0);
blob_record_t r;
rid.size = sizeof(blob_record_t);
stasis_record_read(xid, p, rid, (byte*)&r);
unlock(p->rwlatch);
releasePage(p); */
void stasis_blob_dealloc(int xid, blob_record_t *r) {
TregionDealloc(xid, r->offset);
}
void readBlob(int xid, Page * p2, recordid rid, byte * buf) {
void stasis_blob_read(int xid, Page * p, recordid rid, byte * buf) {
pageid_t chunk;
recordid rawRid = rid;
rawRid.size = BLOB_SLOT;
byte * pbuf = alloca(PAGE_SIZE);
blob_record_t rec;
stasis_record_read(xid, p2, rawRid, (byte*)&rec);
stasis_record_read(xid, p, rawRid, (byte*)&rec);
for(chunk = 0; (chunk+1) * USABLE_SIZE_OF_PAGE < rid.size; chunk++) {
//printf("Chunk = %d->%lld\n", chunk, (long long)rec.offset+chunk);
DEBUG("Chunk = %d->%lld\n", chunk, (long long)rec.offset+chunk);
TpageGet(xid, rec.offset+chunk, pbuf);
memcpy(buf + (chunk * USABLE_SIZE_OF_PAGE), pbuf, USABLE_SIZE_OF_PAGE);
}
TpageGet(xid, rec.offset+chunk, pbuf);
memcpy(buf + (chunk * USABLE_SIZE_OF_PAGE), pbuf, rid.size % USABLE_SIZE_OF_PAGE);
//printf("Chunk = %d->%lld\n", chunk, (long long)rec.offset+chunk);
DEBUG("Chunk = %d->%lld\n", chunk, (long long)rec.offset+chunk);
}
void writeBlob(int xid, Page * p, recordid rid, const void* dat) {
void stasis_blob_write(int xid, Page * p, recordid rid, const void* dat) {
blob_record_t rec;
recordid r = rid;
r.size = sizeof(blob_record_t);
@ -60,7 +52,7 @@ void writeBlob(int xid, Page * p, recordid rid, const void* dat) {
Page * cnk = loadPage(xid, rec.offset+chunk);
writelock(cnk->rwlatch,0);
if(*stasis_page_type_ptr(cnk) != BLOB_PAGE) {
stasis_blob_initialize_page(cnk);
stasis_page_blob_initialize(cnk);
}
unlock(cnk->rwlatch);
// Don't care about race; writes in race have undefined semantics...
@ -69,7 +61,7 @@ void writeBlob(int xid, Page * p, recordid rid, const void* dat) {
Page * cnk = loadPage(xid, rec.offset+chunk);
writelock(cnk->rwlatch,0);
if(*stasis_page_type_ptr(cnk) != BLOB_PAGE) {
stasis_blob_initialize_page(cnk);
stasis_page_blob_initialize(cnk);
}
unlock(cnk->rwlatch);
byte * buf = calloc(1,USABLE_SIZE_OF_PAGE);
@ -77,47 +69,47 @@ void writeBlob(int xid, Page * p, recordid rid, const void* dat) {
TpageSetRange(xid,rec.offset+chunk,0,buf,USABLE_SIZE_OF_PAGE);
free(buf);
}
static int notSupported(int xid, Page * p) { return 0; }
static int stasis_page_not_supported(int xid, Page * p) { return 0; }
static void blobLoaded(Page *p) {
static void stasis_page_blob_loaded(Page *p) {
p->LSN = *stasis_page_lsn_ptr(p);
DEBUG("load lsn: %lld\n", (long long)p->LSN);
}
static void blobFlushed(Page *p) {
static void stasis_page_blob_flushed(Page *p) {
*stasis_page_lsn_ptr(p) = p->LSN;
DEBUG("flush lsn: %lld\n", (long long)p->LSN);
}
static void blobCleanup(Page *p) { }
static void stasis_page_blob_cleanup(Page *p) { }
static page_impl pi = {
BLOB_PAGE,
0, //read,
0, //write,
0, //readDone
0, //writeDone
0, //getType,
0, //setType,
0, //getLength,
0, //recordFirst,
0, //recordNext,
notSupported, // is block supported
0, //pageGenericBlockFirst,
0, //pageGenericBlockNext,
0, //pageGenericBlockDone,
0, //freespace,
0, //compact,
0, //preRalloc,
0, //postRalloc,
0, //Free,
0, //XXX page_impl_dereference_identity,
blobLoaded,
blobFlushed,
blobCleanup
};
page_impl blobImpl() {
page_impl stasis_page_blob_impl() {
page_impl pi = {
BLOB_PAGE,
0, //read,
0, //write,
0, //readDone
0, //writeDone
0, //getType,
0, //setType,
0, //getLength,
0, //recordFirst,
0, //recordNext,
stasis_page_not_supported, // is block supported
0, //pageGenericBlockFirst,
0, //pageGenericBlockNext,
0, //pageGenericBlockDone,
0, //freespace,
0, //compact,
0, //preRalloc,
0, //postRalloc,
0, //Free,
0, //XXX page_impl_dereference_identity,
stasis_page_blob_loaded,
stasis_page_blob_flushed,
stasis_page_blob_cleanup
};
return pi;
}
void stasis_blob_initialize_page(Page * p) {
void stasis_page_blob_initialize(Page * p) {
assertlocked(p->rwlatch);
DEBUG("lsn: %lld\n",(long long)p->LSN);
stasis_page_cleanup(p);

View file

@ -84,7 +84,7 @@ void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t
recordid nextRid = arrayList;
nextRid.slot = node[i];
Page * p = loadPage(xid, arrayList.page); // just pin it forever and ever
nextRid = dereferenceArrayListRid(xid, p, nextRid.slot);
nextRid = stasis_array_list_dereference_recordid(xid, p, nextRid.slot);
releasePage(p);
int thisFifo = stasis_crc32((byte*)&(nextRid.page), sizeof(nextRid.page), (unsigned int)-1) % pool->fifoCount;

View file

@ -28,8 +28,8 @@
Here are some requirements for alloc:
[DONE] Space Reuse: There are many ways to implement this. One method
(that I'm not particularly attached to) is to maintain seperate
linked lists for each type of page, seperated by an estimate of the
(that I'm not particularly attached to) is to maintain separate
linked lists for each type of page, separated by an estimate of the
amount of space free (actually 'un-reserved'; see below) on the
page. Allocation would move pages between linked lists, and search
in the appropriate linked list before expanding the page file.
@ -42,7 +42,7 @@
This would tell Talloc to treat the page as though 'size' bytes had
already been reserved. The 'free space' that Talloc () reasons
about would be: max(reservedSpace, usedSpace). A seperate call,
about would be: max(reservedSpace, usedSpace). A separate call,
TallocFromPage (xid, page, size) already exists, and should ignore
the presence of the 'reserved space' field.
@ -197,20 +197,20 @@ stasis_operation_impl stasis_op_impl_realloc() {
}
static pageid_t lastFreepage;
static allocationPolicy * allocPolicy;
static void registerOldRegions();
static stasis_allocation_policy_t * allocPolicy;
static void stasis_alloc_register_old_regions();
void TallocInit() {
lastFreepage = PAGEID_T_MAX;
allocPolicy = allocationPolicyInit();
allocPolicy = stasis_allocation_policy_init();
}
void TallocPostInit() {
registerOldRegions();
stasis_alloc_register_old_regions();
}
void TallocDeinit() {
allocationPolicyDeinit(allocPolicy);
stasis_allocation_policy_deinit(allocPolicy);
}
static void registerOldRegions() {
static void stasis_alloc_register_old_regions() {
pageid_t boundary = REGION_FIRST_TAG;
boundary_tag t;
DEBUG("registering old regions\n");
@ -237,14 +237,14 @@ static void registerOldRegions() {
releasePage(p);
}
newPages[t.size]=0;
allocationPolicyAddPages(allocPolicy, newPages);
stasis_allocation_policy_register_new_pages(allocPolicy, newPages);
free(newPages);
}
} while(TregionNextBoundaryTag(-1, &boundary, &t, 0)); //STORAGE_MANAGER_TALLOC)) {
}
}
static void reserveNewRegion(int xid) {
static void stasis_alloc_reserve_new_region(int xid) {
void* nta = TbeginNestedTopAction(xid, OPERATION_NOOP, 0,0);
pageid_t firstPage = TregionAlloc(xid, TALLOC_REGION_SIZE, STORAGE_MANAGER_TALLOC);
@ -269,7 +269,7 @@ static void reserveNewRegion(int xid) {
newPages[i] = next;
}
newPages[TALLOC_REGION_SIZE]= 0;
allocationPolicyAddPages(allocPolicy, newPages);
stasis_allocation_policy_register_new_pages(allocPolicy, newPages);
free(newPages); // Don't free the structs it points to; they are in use by the allocation policy.
TendNestedTopAction(xid, nta);
@ -292,12 +292,12 @@ compensated_function recordid Talloc(int xid, unsigned long size) {
Page * p;
availablePage * ap =
allocationPolicyFindPage(allocPolicy, xid,
stasis_allocation_policy_pick_suitable_page(allocPolicy, xid,
stasis_record_type_to_size(type));
if(!ap) {
reserveNewRegion(xid);
ap = allocationPolicyFindPage(allocPolicy, xid,
stasis_alloc_reserve_new_region(xid);
ap = stasis_allocation_policy_pick_suitable_page(allocPolicy, xid,
stasis_record_type_to_size(type));
}
lastFreepage = ap->pageid;
@ -314,20 +314,20 @@ compensated_function recordid Talloc(int xid, unsigned long size) {
unlock(p->rwlatch);
if(!ap->lockCount) {
allocationPolicyUpdateFreespaceUnlockedPage(allocPolicy, ap,
stasis_allocation_policy_update_freespace_unlocked_page(allocPolicy, ap,
newFreespace);
} else {
allocationPolicyUpdateFreespaceLockedPage(allocPolicy, xid, ap,
stasis_allocation_policy_update_freespace_locked_page(allocPolicy, xid, ap,
newFreespace);
}
releasePage(p);
ap = allocationPolicyFindPage(allocPolicy, xid,
ap = stasis_allocation_policy_pick_suitable_page(allocPolicy, xid,
stasis_record_type_to_size(type));
if(!ap) {
reserveNewRegion(xid);
ap = allocationPolicyFindPage(allocPolicy, xid,
stasis_alloc_reserve_new_region(xid);
ap = stasis_allocation_policy_pick_suitable_page(allocPolicy, xid,
stasis_record_type_to_size(type));
}
@ -343,8 +343,8 @@ compensated_function recordid Talloc(int xid, unsigned long size) {
stasis_record_alloc_done(xid, p, rid);
int newFreespace = stasis_record_freespace(xid, p);
allocationPolicyAllocedFromPage(allocPolicy, xid, ap->pageid);
allocationPolicyUpdateFreespaceLockedPage(allocPolicy, xid, ap, newFreespace);
stasis_allocation_policy_alloced_from_page(allocPolicy, xid, ap->pageid);
stasis_allocation_policy_update_freespace_locked_page(allocPolicy, xid, ap, newFreespace);
unlock(p->rwlatch);
alloc_arg a = { rid.slot, type };
@ -353,7 +353,7 @@ compensated_function recordid Talloc(int xid, unsigned long size) {
if(type == BLOB_SLOT) {
rid.size = size;
allocBlob(xid, rid);
stasis_blob_alloc(xid, rid);
}
releasePage(p);
@ -361,16 +361,16 @@ compensated_function recordid Talloc(int xid, unsigned long size) {
return rid;
}
void allocTransactionAbort(int xid) {
void stasis_alloc_aborted(int xid) {
begin_action(pthread_mutex_unlock, &talloc_mutex) {
pthread_mutex_lock(&talloc_mutex);
allocationPolicyTransactionCompleted(allocPolicy, xid);
stasis_allocation_policy_transaction_completed(allocPolicy, xid);
} compensate;
}
void allocTransactionCommit(int xid) {
void stasis_alloc_committed(int xid) {
begin_action(pthread_mutex_unlock, &talloc_mutex) {
pthread_mutex_lock(&talloc_mutex);
allocationPolicyTransactionCompleted(allocPolicy, xid);
stasis_allocation_policy_transaction_completed(allocPolicy, xid);
} compensate;
}
@ -384,7 +384,7 @@ compensated_function recordid TallocFromPage(int xid, pageid_t page, unsigned lo
}
pthread_mutex_lock(&talloc_mutex);
if(!allocationPolicyCanXidAllocFromPage(allocPolicy, xid, page)) {
if(!stasis_allocation_policy_can_xid_alloc_from_page(allocPolicy, xid, page)) {
pthread_mutex_unlock(&talloc_mutex);
return NULLRID;
}
@ -395,7 +395,7 @@ compensated_function recordid TallocFromPage(int xid, pageid_t page, unsigned lo
if(rid.size != INVALID_SLOT) {
stasis_record_alloc_done(xid,p,rid);
allocationPolicyAllocedFromPage(allocPolicy, xid, page);
stasis_allocation_policy_alloced_from_page(allocPolicy, xid, page);
unlock(p->rwlatch);
alloc_arg a = { rid.slot, type };
@ -404,7 +404,7 @@ compensated_function recordid TallocFromPage(int xid, pageid_t page, unsigned lo
if(type == BLOB_SLOT) {
rid.size = size;
allocBlob(xid,rid);
stasis_blob_alloc(xid,rid);
}
} else {
@ -428,7 +428,7 @@ compensated_function void Tdealloc(int xid, recordid rid) {
readlock(p->rwlatch,0);
recordid newrid = stasis_record_dereference(xid, p, rid);
allocationPolicyLockPage(allocPolicy, xid, newrid.page);
stasis_allocation_policy_lock_page(allocPolicy, xid, newrid.page);
int64_t size = stasis_record_length_read(xid,p,rid);
int64_t type = stasis_record_type_read(xid,p,rid);
@ -470,7 +470,7 @@ compensated_function void Tdealloc(int xid, recordid rid) {
pthread_mutex_unlock(&talloc_mutex);
if(type==BLOB_SLOT) {
deallocBlob(xid,(blob_record_t*)(preimage+sizeof(alloc_arg)));
stasis_blob_dealloc(xid,(blob_record_t*)(preimage+sizeof(alloc_arg)));
}
free(preimage);

View file

@ -196,7 +196,7 @@ compensated_function int TarrayListLength(int xid, recordid rid) {
/*----------------------------------------------------------------------------*/
recordid dereferenceArrayListRid(int xid, Page * p, int offset) {
recordid stasis_array_list_dereference_recordid(int xid, Page * p, int offset) {
readlock(p->rwlatch,0);
TarrayListParameters tlp = pageToTLP(xid, p);

View file

@ -39,33 +39,33 @@
@file
*/
static pthread_mutex_t linked_list_mutex;
static pthread_mutex_t stasis_linked_list_mutex;
void LinkedListNTAInit() {
void TlinkedListNTAInit() {
// only need this function since PTHREAD_RECURSIVE_MUTEX_INITIALIZER is really broken...
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&linked_list_mutex, &attr);
pthread_mutex_init(&stasis_linked_list_mutex, &attr);
}
compensated_function static void __TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
compensated_function static int __TlinkedListRemove(int xid, recordid list, const byte * key, int keySize);
compensated_function static void stasis_linked_list_insert_helper(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize);
compensated_function static int stasis_linked_list_remove_helper(int xid, recordid list, const byte * key, int keySize);
typedef struct {
recordid list;
int keySize;
} stasis_linkedListInsert_log;
} stasis_linked_list_insert_log;
typedef struct {
recordid list;
int keySize;
int valueSize;
} stasis_linkedListRemove_log;
} stasis_linked_list_remove_log;
compensated_function static int op_linked_list_nta_insert(const LogEntry* e, Page* p) {
assert(!p);
stasis_linkedListRemove_log * log = (stasis_linkedListRemove_log*)getUpdateArgs(e);;
stasis_linked_list_remove_log * log = (stasis_linked_list_remove_log*)getUpdateArgs(e);;
byte * key;
byte * value;
@ -75,12 +75,12 @@ compensated_function static int op_linked_list_nta_insert(const LogEntry* e, Pag
valueSize = log->valueSize;
key = (byte*)(log+1);
value = ((byte*)(log+1))+keySize;
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&stasis_linked_list_mutex);
// printf("Operate insert called: rid.page = %d keysize = %d valuesize = %d %d {%d %d %d}\n", rid.page, log->keySize, log->valueSize, *(int*)key, value->page, value->slot, value->size);
// Skip writing the undo! Recovery will write a CLR after we're done, effectively
// wrapping this in a nested top action, so we needn't worry about that either.
__TlinkedListInsert(e->xid, log->list, key, keySize, value, valueSize);
stasis_linked_list_insert_helper(e->xid, log->list, key, keySize, value, valueSize);
} compensate_ret(compensation_error());
// pthread_mutex_unlock(&linked_list_mutex);
@ -88,18 +88,18 @@ compensated_function static int op_linked_list_nta_insert(const LogEntry* e, Pag
}
compensated_function static int op_linked_list_nta_remove(const LogEntry *e, Page* p) {
assert(!p);
stasis_linkedListRemove_log * log = (stasis_linkedListRemove_log*)getUpdateArgs(e);
stasis_linked_list_remove_log * log = (stasis_linked_list_remove_log*)getUpdateArgs(e);
byte * key;
int keySize;
keySize = log->keySize;
key = (byte*)(log+1);
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&stasis_linked_list_mutex);
// printf("Operate remove called: %d\n", *(int*)key);
// Don't call the version that writes an undo entry!
__TlinkedListRemove(e->xid, log->list, key, keySize);
stasis_linked_list_remove_helper(e->xid, log->list, key, keySize);
} compensate_ret(compensation_error());
// pthread_mutex_unlock(&linked_list_mutex);
return 0;
@ -111,17 +111,17 @@ compensated_function int TlinkedListInsert(int xid, recordid list, const byte *
ret = TlinkedListRemove(xid, list, key, keySize);
} end_ret(compensation_error()); */
stasis_linkedListInsert_log * undoLog = malloc(sizeof(stasis_linkedListInsert_log) + keySize);
stasis_linked_list_insert_log * undoLog = malloc(sizeof(stasis_linked_list_insert_log) + keySize);
undoLog->list = list;
undoLog->keySize = keySize;
memcpy(undoLog+1, key, keySize);
pthread_mutex_lock(&linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&stasis_linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_INSERT,
(byte*)undoLog, sizeof(stasis_linkedListInsert_log) + keySize);
(byte*)undoLog, sizeof(stasis_linked_list_insert_log) + keySize);
free(undoLog);
__TlinkedListInsert(xid, list, key, keySize, value, valueSize);
stasis_linked_list_insert_helper(xid, list, key, keySize, value, valueSize);
TendNestedTopAction(xid, handle);
} compensate_ret(compensation_error());
@ -148,7 +148,7 @@ stasis_operation_impl stasis_op_impl_linked_list_remove() {
};
return o;
}
compensated_function static void __TlinkedListInsert(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
compensated_function static void stasis_linked_list_insert_helper(int xid, recordid list, const byte * key, int keySize, const byte * value, int valueSize) {
//int ret = Tli nkedListRemove(xid, list, key, keySize);
try {
@ -182,20 +182,20 @@ compensated_function int TlinkedListFind(int xid, recordid list, const byte * ke
stasis_linkedList_entry * entry = malloc(list.size);
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, -2) {
pthread_mutex_lock(&linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, -2) {
pthread_mutex_lock(&stasis_linked_list_mutex);
Tread(xid, list, entry);
} end_action_ret(-2);
if(!entry->next.size) {
free(entry);
pthread_mutex_unlock(&linked_list_mutex);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return -1; // empty list
}
int done = 0;
int ret = -1;
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, -2) {
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, -2) {
while(!done) {
if(!memcmp(entry + 1, key, keySize)) {
@ -225,20 +225,20 @@ compensated_function int TlinkedListFind(int xid, recordid list, const byte * ke
compensated_function int TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) {
byte * value;
int valueSize;
pthread_mutex_lock(&linked_list_mutex);
pthread_mutex_lock(&stasis_linked_list_mutex);
int ret;
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
ret = TlinkedListFind(xid, list, key, keySize, &value);
} end_action_ret(compensation_error());
if(ret != -1) {
valueSize = ret;
} else {
pthread_mutex_unlock(&linked_list_mutex);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return 0;
}
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
int entrySize = sizeof(stasis_linkedListRemove_log) + keySize + valueSize;
stasis_linkedListRemove_log * undoLog = malloc(entrySize);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
int entrySize = sizeof(stasis_linked_list_remove_log) + keySize + valueSize;
stasis_linked_list_remove_log * undoLog = malloc(entrySize);
undoLog->list = list;
undoLog->keySize = keySize;
@ -247,12 +247,12 @@ compensated_function int TlinkedListRemove(int xid, recordid list, const byte *
memcpy(undoLog+1, key, keySize);
memcpy(((byte*)(undoLog+1))+keySize, value, valueSize);
// printf("entry size %d sizeof(remove_log)%d keysize %d valuesize %d sizeof(rid) %d key %d value {%d %d %ld}\n",
// entrySize, sizeof(stasis_linkedListRemove_log), keySize, valueSize, sizeof(recordid), key, value->page, value->slot, value->size);
// entrySize, sizeof(stasis_linked_list_remove_log), keySize, valueSize, sizeof(recordid), key, value->page, value->slot, value->size);
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_REMOVE,
(byte*)undoLog, entrySize);
free(value);
free(undoLog);
__TlinkedListRemove(xid, list, key, keySize);
stasis_linked_list_remove_helper(xid, list, key, keySize);
TendNestedTopAction(xid, handle);
} compensate_ret(compensation_error());
@ -260,18 +260,18 @@ compensated_function int TlinkedListRemove(int xid, recordid list, const byte *
return 1;
}
compensated_function static int __TlinkedListRemove(int xid, recordid list, const byte * key, int keySize) {
compensated_function static int stasis_linked_list_remove_helper(int xid, recordid list, const byte * key, int keySize) {
stasis_linkedList_entry * entry = malloc(list.size);
pthread_mutex_lock(&linked_list_mutex);
pthread_mutex_lock(&stasis_linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
Tread(xid, list, entry);
} end_action_ret(compensation_error());
if(entry->next.size == 0) {
//Empty List.
free(entry);
pthread_mutex_unlock(&linked_list_mutex);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return 0;
}
int listRoot = 1;
@ -280,7 +280,7 @@ compensated_function static int __TlinkedListRemove(int xid, recordid list, cons
oldLastRead.size = -2;
int ret = 0;
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
while(1) {
if(compensation_error()) { break; }
@ -333,8 +333,8 @@ compensated_function static int __TlinkedListRemove(int xid, recordid list, cons
compensated_function int TlinkedListMove(int xid, recordid start_list, recordid end_list, const byte *key, int keySize) {
byte * value = 0;
int ret;
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&stasis_linked_list_mutex);
int valueSize = TlinkedListFind(xid, start_list, key, keySize, &value);
if(valueSize != -1) {
// pthread_mutex_unlock(&linked_list_mutex);
@ -406,8 +406,8 @@ compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * i
int done = 0;
int ret = 0;
stasis_linkedList_entry * entry;
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&stasis_linked_list_mutex);
if(it->first == -1) {
it->first = 1;
@ -433,11 +433,11 @@ compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * i
} end_action_ret(compensation_error());
if(done) {
pthread_mutex_unlock(&linked_list_mutex);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return ret;
}
begin_action_ret(pthread_mutex_unlock, &linked_list_mutex, compensation_error()) {
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
assert(it->keySize + it->valueSize + sizeof(stasis_linkedList_entry) == it->next.size);
entry = malloc(it->next.size);
Tread(xid, it->next, entry);
@ -462,6 +462,6 @@ compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * i
}
free(entry);
pthread_mutex_unlock(&linked_list_mutex);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return ret;
}

View file

@ -38,23 +38,23 @@ static void rehash(int xid, recordid hash, pageid_t next_split, pageid_t i, unsi
static void update_hash_header(int xid, recordid hash, pageid_t i, pageid_t next_split);
static int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents,
void * key, int keySize, int valSize, recordid * deletedEntry);
static void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents,
static void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents,
hashEntry * e, int keySize, int valSize, int skipDelete);
static int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize);
int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize) {
static int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key, int keySize, void * val, int valSize) {
int found;
try_ret(compensation_error()) {
try_ret(compensation_error()) {
hashEntry * e = malloc(sizeof(hashEntry) + keySize + valSize);
recordid nextEntry;
hashRid.slot = bucket_number;
nextEntry = hashRid;
found = 0;
while(nextEntry.size != -1 && nextEntry.size != 0) {
if(compensation_error()) { break; }
assert(nextEntry.size == sizeof(hashEntry) + keySize + valSize);
@ -65,15 +65,15 @@ int findInBucket(int xid, recordid hashRid, int bucket_number, const void * key,
break;
}
nextEntry = e->next;
}
}
free(e);
} end_ret(compensation_error());
return found;
}
void expand(int xid, recordid hash, int next_split, int i, int keySize, int valSize) {
static void expand(int xid, recordid hash, int next_split, int i, int keySize, int valSize) {
/* Total hack; need to do this better, by storing stuff in the hash table headers.*/
static int count = 4096 * .25;
@ -90,72 +90,72 @@ void expand(int xid, recordid hash, int next_split, int i, int keySize, int valS
if(next_split >= stasis_util_two_to_the(i-1)+2) {
i++;
next_split = 2;
}
rehash(xid, hash, next_split, i, keySize, valSize);
}
rehash(xid, hash, next_split, i, keySize, valSize);
next_split++;
headerNextSplit = next_split;
headerHashBits = i;
headerHashBits = i;
}
update_hash_header(xid, hash, i, next_split);
update_hash_header(xid, hash, i, next_split);
} end;
}
}
void update_hash_header(int xid, recordid hash, pageid_t i, pageid_t next_split) {
try {
static void update_hash_header(int xid, recordid hash, pageid_t i, pageid_t next_split) {
try {
hashEntry * he = pblHtLookup(openHashes, &(hash.page), sizeof(hash.page));
assert(he);
recordid * headerRidB = &he->next;
assert(headerRidB);
headerHashBits = i;
headerNextSplit = next_split;
hash.slot = 1;
Tset(xid, hash, headerRidB);
} end;
}
void rehash(int xid, recordid hashRid, pageid_t next_split, pageid_t i, unsigned int keySize, unsigned int valSize) {
try {
int firstA = 1; // Is 'A' the recordid of a bucket?
int firstD = 1; // What about 'D'?
static void rehash(int xid, recordid hashRid, pageid_t next_split, pageid_t i, unsigned int keySize, unsigned int valSize) {
try {
int firstA = 1; // Is 'A' the recordid of a bucket?
int firstD = 1; // What about 'D'?
assert(hashRid.size == sizeof(hashEntry) + keySize + valSize);
recordid ba = hashRid; ba.slot = next_split;
recordid bb = hashRid; bb.slot = next_split + stasis_util_two_to_the(i-1);
hashEntry * D_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * A_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
hashEntry * B_contents = calloc(1,sizeof(hashEntry) + keySize + valSize);
Tread(xid, ba, A_contents);
Tread(xid, bb, D_contents);
recordid A = ba; //ba_contents;
recordid D = bb; //bb_contents;
recordid A = ba; //ba_contents;
recordid D = bb; //bb_contents;
recordid B = A_contents->next;
recordid C;
if(!A_contents->next.size) {
recordid C;
if(!A_contents->next.size) {
/* Bucket A is empty, so we're done. */
free(D_contents);
free(A_contents);
free(B_contents);
return;
return;
}
uint64_t old_hash;
uint64_t new_hash =
2 + stasis_linear_hash(A_contents+1, keySize, i, UINT_MAX);
while(new_hash != next_split) {
// Need a record in A that belongs in the first bucket...
// Need a record in A that belongs in the first bucket...
recordid oldANext = A_contents->next;
A_contents->next = NULLRID;
if(firstD) {
// assert(memcmp(&A_contents->next, &D_contents->next, sizeof(recordid)));
Tset(xid, D, A_contents);
@ -179,9 +179,9 @@ void rehash(int xid, recordid hashRid, pageid_t next_split, pageid_t i, unsigned
hashEntry * swap = D_contents;
D_contents = A_contents;
A_contents = swap;
/* A_contents is now garbage. */
assert(A.size == sizeof(hashEntry) + keySize + valSize);
if(oldANext.size == -1) {
memset(A_contents, 0, sizeof(hashEntry) + keySize + valSize);
@ -191,29 +191,29 @@ void rehash(int xid, recordid hashRid, pageid_t next_split, pageid_t i, unsigned
free(A_contents);
free(B_contents);
return;
}
}
assert(oldANext.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, oldANext, A_contents);
// assert(memcmp(&A_contents->next, &A, sizeof(recordid)));
Tset(xid, A, A_contents);
Tdealloc(xid, oldANext);
new_hash = stasis_linear_hash(A_contents+1, keySize, i, UINT_MAX) + 2;
}
B = A_contents->next;
while(B.size != -1) {
assert(B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, B, B_contents);
C = B_contents->next;
old_hash = stasis_linear_hash(B_contents+1, keySize, i-1, UINT_MAX) + 2;
new_hash = stasis_linear_hash(B_contents+1, keySize, i, UINT_MAX) + 2;
assert(next_split == old_hash);
assert(next_split == old_hash);
assert(new_hash == old_hash || new_hash == old_hash + stasis_util_two_to_the(i-1));
if(new_hash == old_hash) {
A = B;
B = C;
@ -222,27 +222,27 @@ void rehash(int xid, recordid hashRid, pageid_t next_split, pageid_t i, unsigned
} else {
assert(D.size == sizeof(hashEntry) + keySize + valSize);
assert(B.size == -1 || B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, D, D_contents);
Tread(xid, D, D_contents);
D_contents->next = B;
assert(B.size != 0);
Tset(xid, D, D_contents);
// A is somewhere in the first list.
// A is somewhere in the first list.
assert(A.size == sizeof(hashEntry) + keySize + valSize);
assert(C.size == -1 || C.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, A, A_contents);
A_contents->next = C;
assert(C.size != 0);
Tset(xid, A, A_contents);
// B _can't_ be a bucket.
assert(B.size == sizeof(hashEntry) + keySize + valSize);
Tread(xid, B, B_contents);
B_contents->next = NULLRID;
Tset(xid, B, B_contents);
// Update Loop State
// Update Loop State
D = B;
B = C;
C.size = -1;
@ -254,7 +254,7 @@ void rehash(int xid, recordid hashRid, pageid_t next_split, pageid_t i, unsigned
free(B_contents);
} end;
}
void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents,
static void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry * bucket_contents,
hashEntry * e, int keySize, int valSize, int skipDelete) {
recordid deleteMe;
if(!skipDelete) {
@ -294,10 +294,10 @@ void insertIntoBucket(int xid, recordid hashRid, int bucket_number, hashEntry *
assert(bucket_contents->next.size);
}
int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents,
static int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * bucket_contents,
void * key, int keySize, int valSize, recordid * deletedEntry) {
if(bucket_contents->next.size == 0) { return 0; }
recordid this = hash;
this.slot = bucket_number;
@ -341,10 +341,10 @@ int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * buck
A->next = B->next;
assert(Aaddr.size == sizeof(hashEntry) + keySize + valSize);
Tset(xid, Aaddr, A);
if(deletedEntry) {
if(deletedEntry) {
*deletedEntry = Baddr;
}
found = 1;
found = 1;
break;
}
@ -354,7 +354,7 @@ int deleteFromBucket(int xid, recordid hash, int bucket_number, hashEntry * buck
free(B);
return found;
}
}
recordid TnaiveHashCreate(int xid, int keySize, int valSize) {
/* Want 16 buckets, doubling on overflow. */
@ -366,22 +366,22 @@ recordid TnaiveHashCreate(int xid, int keySize, int valSize) {
recordid * headerRidB = calloc (1, sizeof(recordid) + keySize + valSize);
assert(headerRidB);
headerKeySize = keySize;
headerValSize = valSize;
headerNextSplit = INT_MAX;
headerHashBits = 12;
assert(headerRidB);
rid.slot =0;
Tset(xid, rid, headerRidA);
rid.slot =1;
Tset(xid, rid, headerRidB);
assert(headerRidB);
pblHtInsert(openHashes, &(rid.page), sizeof(rid.page), headerRidB);
assert(headerRidB);
@ -414,8 +414,8 @@ void TnaiveHashDeinit() {
pblHtDelete(openHashes);
}
void TnaiveHashInsert(int xid, recordid hashRid,
void * key, int keySize,
void TnaiveHashInsert(int xid, recordid hashRid,
void * key, int keySize,
void * val, int valSize) {
recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(hashRid.page));
@ -431,8 +431,8 @@ void TnaiveHashInsert(int xid, recordid hashRid,
hashRid.slot = bucket;
Tread(xid, hashRid, bucket_contents);
insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, 0);
expand(xid, hashRid, headerNextSplit, headerHashBits, keySize, valSize);
insertIntoBucket(xid, hashRid, bucket, bucket_contents, e, keySize, valSize, 0);
expand(xid, hashRid, headerNextSplit, headerHashBits, keySize, valSize);
free(bucket_contents);
free(e);
@ -440,7 +440,7 @@ void TnaiveHashInsert(int xid, recordid hashRid,
}
/** @todo hash hable probably should track the number of items in it,
so that expand can be selectively called. */
int TnaiveHashDelete(int xid, recordid hashRid,
int TnaiveHashDelete(int xid, recordid hashRid,
void * key, int keySize, int valSize) {
recordid * headerRidB = pblHtLookup(openHashes, &(hashRid.page), sizeof(hashRid.page));
@ -467,7 +467,7 @@ int TnaiveHashOpen(int xid, recordid hashRid, int keySize, int valSize) {
recordid * headerRidB = malloc(sizeof(recordid) + keySize + valSize);
hashRid.slot = 1;
Tread(xid, hashRid, headerRidB);
pblHtInsert(openHashes, &(hashRid.page), sizeof(hashRid.page), headerRidB);
return 0;

View file

@ -96,7 +96,7 @@ int Tset(int xid, recordid rid, const void * dat) {
short type = stasis_record_type_read(xid,p,rid);
if(type == BLOB_SLOT) {
writeBlob(xid,p,rid,dat);
stasis_blob_write(xid,p,rid,dat);
unlock(p->rwlatch);
releasePage(p);
} else {

View file

@ -123,7 +123,7 @@ void stasis_page_init() {
stasis_page_impl_register(fixedImpl());
stasis_page_impl_register(boundaryTagImpl());
stasis_page_impl_register(arrayListImpl());
stasis_page_impl_register(blobImpl());
stasis_page_impl_register(stasis_page_blob_impl());
stasis_page_impl_register(indirectImpl());
stasis_page_impl_register(lsmRootImpl());
stasis_page_impl_register(slottedLsnFreeImpl());
@ -180,7 +180,7 @@ recordid stasis_record_dereference(int xid, Page * p, recordid rid) {
if(page_type == INDIRECT_PAGE) {
rid = dereferenceIndirectRID(xid, rid);
} else if(page_type == ARRAY_LIST_PAGE) {
rid = dereferenceArrayListRid(xid, p, rid.slot);
rid = stasis_array_list_dereference_recordid(xid, p, rid.slot);
}
return rid;
}

View file

@ -87,7 +87,7 @@ int Tinit() {
TallocInit();
TnaiveHashInit();
LinearHashNTAInit();
LinkedListNTAInit();
TlinkedListNTAInit();
iterator_init();
consumer_init();
setupLockManagerCallbacksNil();
@ -254,7 +254,7 @@ compensated_function void Tread(int xid, recordid rid, void * dat) {
short type = stasis_record_type_read(xid,p,rid);
if(type == BLOB_SLOT) {
DEBUG("call readBlob %lld %lld %lld\n", (long long)rid.page, (long long)rid.slot, (long long)rid.size);
readBlob(xid,p,rid,dat);
stasis_blob_read(xid,p,rid,dat);
assert(rid.page == p->id);
} else {
stasis_record_read(xid, p, rid, dat);
@ -286,7 +286,7 @@ int Tcommit(int xid) {
lsn = LogTransCommit(stasis_log_file, &stasis_transaction_table[xid % MAX_TRANSACTIONS]);
if(globalLockManager.commit) { globalLockManager.commit(xid); }
allocTransactionCommit(xid);
stasis_alloc_committed(xid);
pthread_mutex_lock(&stasis_transaction_table_mutex);
@ -320,7 +320,7 @@ int Tabort(int xid) {
undoTrans(stasis_log_file, *t);
if(globalLockManager.abort) { globalLockManager.abort(xid); }
allocTransactionAbort(xid);
stasis_alloc_aborted(xid);
return 0;
}

View file

@ -4,7 +4,7 @@
#include <stasis/common.h>
struct allocationPolicy;
typedef struct allocationPolicy allocationPolicy;
typedef struct allocationPolicy stasis_allocation_policy_t;
typedef struct availablePage {
int freespace;
@ -12,15 +12,15 @@ typedef struct availablePage {
int lockCount; // Number of active transactions that have alloced or dealloced from this page.
} availablePage;
allocationPolicy * allocationPolicyInit();
void allocationPolicyDeinit(allocationPolicy * ap);
void allocationPolicyAddPages(allocationPolicy * ap, availablePage** newPages);
availablePage * allocationPolicyFindPage(allocationPolicy * ap, int xid, int freespace);
void allocationPolicyTransactionCompleted(allocationPolicy * ap, int xid);
void allocationPolicyUpdateFreespaceUnlockedPage(allocationPolicy * ap, availablePage * key, int newFree);
void allocationPolicyUpdateFreespaceLockedPage(allocationPolicy * ap, int xid, availablePage * key, int newFree);
void allocationPolicyLockPage(allocationPolicy * ap, int xid, pageid_t page);
void allocationPolicyAllocedFromPage(allocationPolicy * ap, int xid, pageid_t page);
stasis_allocation_policy_t * stasis_allocation_policy_init();
void stasis_allocation_policy_deinit(stasis_allocation_policy_t * ap);
void stasis_allocation_policy_register_new_pages(stasis_allocation_policy_t * ap, availablePage** newPages);
availablePage * stasis_allocation_policy_pick_suitable_page(stasis_allocation_policy_t * ap, int xid, int freespace);
void stasis_allocation_policy_transaction_completed(stasis_allocation_policy_t * ap, int xid);
void stasis_allocation_policy_update_freespace_unlocked_page(stasis_allocation_policy_t * ap, availablePage * key, int newFree);
void stasis_allocation_policy_update_freespace_locked_page(stasis_allocation_policy_t * ap, int xid, availablePage * key, int newFree);
void stasis_allocation_policy_lock_page(stasis_allocation_policy_t * ap, int xid, pageid_t page);
void stasis_allocation_policy_alloced_from_page(stasis_allocation_policy_t * ap, int xid, pageid_t page);
/**
Check to see if it is safe to allocate from a particular page.
@ -35,5 +35,5 @@ void allocationPolicyAllocedFromPage(allocationPolicy * ap, int xid, pageid_t pa
@param page The page that will be allocated from.
@return true if the allocation would be safe. false if not sure.
*/
int allocationPolicyCanXidAllocFromPage(allocationPolicy * ap, int xid, pageid_t page);
int stasis_allocation_policy_can_xid_alloc_from_page(stasis_allocation_policy_t * ap, int xid, pageid_t page);
#endif // ALLOCATION_POLICY_H

View file

@ -52,13 +52,13 @@ BEGIN_C_DECLS
/**
Read the blob from the recordid rid into buf.
*/
void readBlob(int xid, Page * p, recordid rid, void * buf);
void stasis_blob_read(int xid, Page * p, recordid rid, void * buf);
/**
Write the contents of buf to the blob in recordid rid.
*/
void writeBlob(int xid, Page * p, recordid rid, const void * buf);
void stasis_blob_write(int xid, Page * p, recordid rid, const void * buf);
compensated_function recordid preAllocBlob(int xid, long blobsize);
compensated_function recordid preAllocBlobFromPage(int xid, long page, long blobsize);
@ -67,10 +67,10 @@ compensated_function recordid preAllocBlobFromPage(int xid, long page, long blob
Allocate a blob of size blobSize.
*/
void allocBlob(int xid, recordid rid);
void deallocBlob(int xid, blob_record_t* r);
void stasis_blob_alloc(int xid, recordid rid);
void stasis_blob_dealloc(int xid, blob_record_t* r);
page_impl blobImpl();
page_impl stasis_page_blob_impl();
END_C_DECLS

View file

@ -16,8 +16,8 @@ stasis_operation_impl stasis_op_impl_alloc();
stasis_operation_impl stasis_op_impl_dealloc();
stasis_operation_impl stasis_op_impl_realloc();
void allocTransactionAbort(int xid);
void allocTransactionCommit(int xid);
void stasis_alloc_aborted(int xid);
void stasis_alloc_committed(int xid);
void TallocInit();
void TallocPostInit();

View file

@ -111,7 +111,7 @@ compensated_function int TarrayListExtend(int xid, recordid rid, int slots);
compensated_function int TarrayListLength(int xid, recordid rid);
/** Used by Tread() and Tset() to map from arrayList index to recordid. */
recordid dereferenceArrayListRid(int xid, Page * p, int offset);
recordid stasis_array_list_dereference_recordid(int xid, Page * p, int offset);
stasis_operation_impl stasis_op_impl_array_list_header_init();
/** @} */

View file

@ -48,7 +48,7 @@ compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * i
compensated_function recordid TlinkedListCreate(int xid, int keySize, int ValueSize);
compensated_function void TlinkedListDelete(int xid, recordid list);
void LinkedListNTAInit();
void TlinkedListNTAInit();
stasis_operation_impl stasis_op_impl_linked_list_insert();
stasis_operation_impl stasis_op_impl_linked_list_remove();
#endif //__LINKED_LIST_NTA_H

View file

@ -896,7 +896,7 @@ void stasis_slotted_lsn_free_initialize_page(Page * p);
void stasis_fixed_initialize_page(Page * page, size_t size, int count);
void stasis_indirect_initialize_page(Page * p, int height);
int stasis_fixed_records_per_page(size_t size);
void stasis_blob_initialize_page(Page * p);
void stasis_page_blob_initialize(Page * p);
page_impl slottedLsnFreeImpl();
END_C_DECLS

View file

@ -93,58 +93,58 @@ START_TEST(allocationPolicy_smokeTest)
pages[4] = 0;
allocationPolicy * ap = allocationPolicyInit();
allocationPolicyAddPages(ap, pages);
stasis_allocation_policy_t * ap = stasis_allocation_policy_init();
stasis_allocation_policy_register_new_pages(ap, pages);
availablePage * p1 = allocationPolicyFindPage(ap, 1, 51);
allocationPolicyAllocedFromPage(ap, 1, p1->pageid);
availablePage * p1 = stasis_allocation_policy_pick_suitable_page(ap, 1, 51);
stasis_allocation_policy_alloced_from_page(ap, 1, p1->pageid);
assert(p1);
assert(p1->pageid == 0 || p1->pageid == 1);
allocationPolicyUpdateFreespaceLockedPage(ap, 1, p1, 0);
stasis_allocation_policy_update_freespace_locked_page(ap, 1, p1, 0);
availablePage * p2 = allocationPolicyFindPage(ap, 1, 21);
allocationPolicyAllocedFromPage(ap, 1, p2->pageid);
availablePage * p2 = stasis_allocation_policy_pick_suitable_page(ap, 1, 21);
stasis_allocation_policy_alloced_from_page(ap, 1, p2->pageid);
assert(p2->pageid == 3);
allocationPolicyUpdateFreespaceLockedPage(ap, 1, p2, 0);
stasis_allocation_policy_update_freespace_locked_page(ap, 1, p2, 0);
availablePage * p3 = allocationPolicyFindPage(ap, 2, 51);
allocationPolicyAllocedFromPage(ap, 2, p3->pageid);
availablePage * p3 = stasis_allocation_policy_pick_suitable_page(ap, 2, 51);
stasis_allocation_policy_alloced_from_page(ap, 2, p3->pageid);
assert(p1->pageid != p3->pageid);
allocationPolicyUpdateFreespaceLockedPage(ap, 2, p3, 0);
stasis_allocation_policy_update_freespace_locked_page(ap, 2, p3, 0);
availablePage * p4 = allocationPolicyFindPage(ap, 2, 51);
availablePage * p4 = stasis_allocation_policy_pick_suitable_page(ap, 2, 51);
assert(!p4);
availablePage * p5 = allocationPolicyFindPage(ap, 2, 50);
allocationPolicyAllocedFromPage(ap, 2, p5->pageid);
availablePage * p5 = stasis_allocation_policy_pick_suitable_page(ap, 2, 50);
stasis_allocation_policy_alloced_from_page(ap, 2, p5->pageid);
assert(p5 && p5->pageid == 2);
allocationPolicyUpdateFreespaceLockedPage(ap, 2, p5, 0);
stasis_allocation_policy_update_freespace_locked_page(ap, 2, p5, 0);
allocationPolicyUpdateFreespaceLockedPage(ap, 1, p1, 100);
allocationPolicyTransactionCompleted(ap, 1);
allocationPolicyUpdateFreespaceUnlockedPage(ap, p2, 25);
stasis_allocation_policy_update_freespace_locked_page(ap, 1, p1, 100);
stasis_allocation_policy_transaction_completed(ap, 1);
stasis_allocation_policy_update_freespace_unlocked_page(ap, p2, 25);
availablePage * p6 = allocationPolicyFindPage(ap, 2, 50);
allocationPolicyAllocedFromPage(ap, 2, p6->pageid);
availablePage * p6 = stasis_allocation_policy_pick_suitable_page(ap, 2, 50);
stasis_allocation_policy_alloced_from_page(ap, 2, p6->pageid);
assert(p6->pageid == 1 || p6->pageid == 0);
allocationPolicyUpdateFreespaceLockedPage(ap, 2, p6, 0);
stasis_allocation_policy_update_freespace_locked_page(ap, 2, p6, 0);
availablePage * p7 = allocationPolicyFindPage(ap, 2, 50);
availablePage * p7 = stasis_allocation_policy_pick_suitable_page(ap, 2, 50);
assert(!p7);
allocationPolicyUpdateFreespaceUnlockedPage(ap, pages[3], 51);
stasis_allocation_policy_update_freespace_unlocked_page(ap, pages[3], 51);
availablePage * p8 =allocationPolicyFindPage(ap, 2, 51);
availablePage * p8 =stasis_allocation_policy_pick_suitable_page(ap, 2, 51);
assert(p8->pageid == 3);
allocationPolicyAllocedFromPage(ap, 2, p8->pageid);
stasis_allocation_policy_alloced_from_page(ap, 2, p8->pageid);
allocationPolicyUpdateFreespaceLockedPage(ap, 2, p8, 0);
stasis_allocation_policy_update_freespace_locked_page(ap, 2, p8, 0);
allocationPolicyTransactionCompleted(ap, 2);
stasis_allocation_policy_transaction_completed(ap, 2);
allocationPolicyDeinit(ap);
stasis_allocation_policy_deinit(ap);
free(pages);
@ -162,7 +162,7 @@ static const int MAX_DESIRED_FREESPACE =
#define PHASE_TWO_COUNT 500000
static int nextxid = 0;
int activexidcount = 0;
static void takeRandomAction(allocationPolicy * ap, int * xids,
static void takeRandomAction(stasis_allocation_policy_t * ap, int * xids,
availablePage ** pages1, availablePage ** pages2) {
switch(myrandom(6)) {
case 0 : { // find page
@ -175,7 +175,7 @@ static void takeRandomAction(allocationPolicy * ap, int * xids,
}
int thefreespace = myrandom(MAX_DESIRED_FREESPACE);
availablePage * p =
allocationPolicyFindPage(ap, xids[thexid], thefreespace);
stasis_allocation_policy_pick_suitable_page(ap, xids[thexid], thefreespace);
if(p) {
DEBUG("alloc succeeds\n");
// xxx validate returned value...
@ -187,7 +187,7 @@ static void takeRandomAction(allocationPolicy * ap, int * xids,
if(!activexidcount) { break; }
int thexid;
while(xids[thexid = myrandom(XACT_COUNT)] == -1) { }
allocationPolicyTransactionCompleted(ap, xids[thexid]);
stasis_allocation_policy_transaction_completed(ap, xids[thexid]);
xids[thexid] = -1;
activexidcount--;
DEBUG("complete");
@ -203,11 +203,11 @@ static void takeRandomAction(allocationPolicy * ap, int * xids,
} else {
minfreespace = 0;
}
availablePage * p = allocationPolicyFindPage(ap, xids[thexid],
availablePage * p = stasis_allocation_policy_pick_suitable_page(ap, xids[thexid],
minfreespace);
if(p && p->lockCount == 0) {
int thenewfreespace = p->freespace+thespacediff;
allocationPolicyUpdateFreespaceUnlockedPage(ap, p, thenewfreespace);
stasis_allocation_policy_update_freespace_unlocked_page(ap, p, thenewfreespace);
// printf("updated freespace unlocked");
}
} break;
@ -223,7 +223,7 @@ static void takeRandomAction(allocationPolicy * ap, int * xids,
if(!activexidcount) { break; }
while(xids[thexid = myrandom(XACT_COUNT)] == -1) { }
pageid_t thepage=myrandom(AVAILABLE_PAGE_COUNT_A + pages2?AVAILABLE_PAGE_COUNT_B:0);
allocationPolicyAllocedFromPage(ap,thexid,thepage); */
stasis_allocation_policy_alloced_from_page(ap,thexid,thepage); */
} break;
@ -266,22 +266,22 @@ START_TEST(allocationPolicy_randomTest) {
}
pages2[AVAILABLE_PAGE_COUNT_B] = 0;
allocationPolicy * ap = allocationPolicyInit();
stasis_allocation_policy_t * ap = stasis_allocation_policy_init();
allocationPolicyAddPages(ap, pages1);
stasis_allocation_policy_register_new_pages(ap, pages1);
for(int k = 0; k < PHASE_ONE_COUNT; k++) {
// Don't pass in pages2; ap doesn't know about them yet!
takeRandomAction(ap, xids, pages1, 0);
}
allocationPolicyAddPages(ap, pages2);
stasis_allocation_policy_register_new_pages(ap, pages2);
for(int k = 0; k < PHASE_TWO_COUNT; k++) {
takeRandomAction(ap, xids, pages1, pages2);
}
allocationPolicyDeinit(ap);
stasis_allocation_policy_deinit(ap);
free(pages1);
free(pages2);