cleaned up includes, autoconf is now optional, since cmake now creates a config.h

This commit is contained in:
Sears Russell 2009-07-26 18:51:45 +00:00
parent 260310e151
commit 081c61b414
51 changed files with 567 additions and 637 deletions

View file

@ -26,6 +26,7 @@
<option id="gnu.c.compiler.option.include.paths.2049135479" name="Include paths (-I)" superClass="gnu.c.compiler.option.include.paths" valueType="includePath">
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/stasis/src/}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/stasis/}&quot;"/>
<listOptionValue builtIn="false" value="&quot;${workspace_loc:/stasis/build/}&quot;"/>
</option>
<inputType id="cdt.managedbuild.tool.gnu.c.compiler.input.1962414466" superClass="cdt.managedbuild.tool.gnu.c.compiler.input"/>
</tool>

View file

@ -56,6 +56,21 @@ if(NOT HAVE_FUSE)
message(STATUS "fuse not found; sample application 'stasis_fuse' will not be built")
endif(NOT HAVE_FUSE)
INCLUDE(CheckFunctionExists)
INCLUDE(CheckCSourceCompiles)
CHECK_FUNCTION_EXISTS(sync_file_range HAVE_SYNC_FILE_RANGE)
CHECK_FUNCTION_EXISTS(fdatasync HAVE_FDATASYNC)
CHECK_C_SOURCE_COMPILES("#define _GNU_SOURCE
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
int main(int argc, char * argv[]) {
argc = O_DIRECT;
}
" HAVE_O_DIRECT)
MACRO(CREATE_CHECK NAME)
ADD_EXECUTABLE(${NAME} ${NAME}.c)
TARGET_LINK_LIBRARIES(${NAME} ${COMMON_LIBRARIES})
@ -75,7 +90,8 @@ ENDMACRO(CREATE_EXECUTABLE)
# Output the config.h file
#CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h)
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}/build
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/src
/usr/include)
# set linker path for this and all subdirs
@ -95,3 +111,5 @@ INSTALL(DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/stasis
FILES_MATCHING PATTERN "*.h"
PATTERN ".svn" EXCLUDE
PATTERN ".deps" EXCLUDE)
CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h)

View file

@ -1,6 +1,6 @@
#include <stasis/transactional.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>
void alloc_rids(long long num_rids, recordid ** slow, recordid ** fast) {
*slow = malloc(num_rids * sizeof(**slow));

5
config.h.cmake Normal file
View file

@ -0,0 +1,5 @@
#define _GNU_SOURCE
#cmakedefine HAVE_FDATASYNC
#cmakedefine HAVE_SYNC_FILE_RANGE
#cmakedefine HAVE_O_DIRECT

View file

@ -21,33 +21,34 @@
Because slot allocation is already pretty heavy weight, this file
ignores the overhead of tree / hash lookups and mutex acquisition
for now.
for now.
@todo: Right now, allocation policies lump all nested top actions within a transaction into a single allocation group. It could do better if it knew when NTA's began and committed.
*/
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <stasis/common.h>
#include <stasis/allocationPolicy.h>
#include <stasis/lhtable.h>
#include <stasis/redblack.h>
#include <stasis/transactional.h>
#include <assert.h>
#include <stdio.h>
#define ALLOCATION_POLICY_SANITY_CHECKS
// Each availablePage should either be in availablePages, or in
// Each availablePage should either be in availablePages, or in
// xidAlloced and pageOwners. If a transaction allocs and
// deallocs from the same page, then it only has an entry for that
// page in xidAlloced.
//
// xidAlloced is an lhtable of type (int xid) -> (rbtree of availablePage*)
// xidDealloced is an lhtable of type (int xid) -> (lhtable of int pageid -> availablePage *)
// xidDealloced is an lhtable of type (int xid) -> (lhtable of int pageid -> availablePage *)
// pageOwners is an lhtable of type (int pageid) -> (int xid)
// availablePages is a rbtree of availablePage*.
struct allocationPolicy {
struct allocationPolicy {
struct LH_ENTRY(table) * xidAlloced;
struct LH_ENTRY(table) * xidDealloced;
struct RB_ENTRY(tree) * availablePages;
@ -59,24 +60,24 @@ inline static int cmpPageid(const void * ap, const void * bp, const void * param
const availablePage * a = (const availablePage *)ap;
const availablePage * b = (const availablePage *)bp;
if(a->pageid < b->pageid) {
if(a->pageid < b->pageid) {
return -1;
} else if (a->pageid > b->pageid) {
} else if (a->pageid > b->pageid) {
return 1;
} else {
return 0;
}
}
static int cmpFreespace(const void * ap, const void * bp, const void * param) {
static int cmpFreespace(const void * ap, const void * bp, const void * param) {
const availablePage * a = (const availablePage *) ap;
const availablePage * b = (const availablePage *) bp;
if(a->freespace < b->freespace) {
if(a->freespace < b->freespace) {
return -1;
} else if (a->freespace > b->freespace) {
} else if (a->freespace > b->freespace) {
return 1;
} else {
} else {
return cmpPageid(ap,bp,param);
}
}
@ -85,10 +86,10 @@ inline static availablePage* getAvailablePage(stasis_allocation_policy_t * ap, p
return (availablePage*) LH_ENTRY(find)(ap->allPages, &pageid, sizeof(pageid));
}
inline static void insert_xidAlloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
inline static void insert_xidAlloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
struct RB_ENTRY(tree) * pages = LH_ENTRY(find)(ap->xidAlloced, &xid, sizeof(xid));
if(!pages) {
if(!pages) {
pages = RB_ENTRY(init)(cmpFreespace, 0);
LH_ENTRY(insert)(ap->xidAlloced, &xid, sizeof(xid), pages);
}
@ -96,7 +97,7 @@ inline static void insert_xidAlloced(stasis_allocation_policy_t * ap, int xid, a
assert(check == p);
}
inline static void remove_xidAlloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
inline static void remove_xidAlloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
assert(p->lockCount);
struct RB_ENTRY(tree) * pages = LH_ENTRY(find)(ap->xidAlloced, &xid, sizeof(xid));
assert(pages);
@ -104,10 +105,10 @@ inline static void remove_xidAlloced(stasis_allocation_policy_t * ap, int xid, a
assert(check == p); // sometimes fails
}
inline static void insert_xidDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
inline static void insert_xidDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
struct RB_ENTRY(tree) * pages = LH_ENTRY(find)(ap->xidDealloced, &xid, sizeof(xid));
if(!pages) {
if(!pages) {
pages = RB_ENTRY(init)(cmpPageid, 0);
LH_ENTRY(insert)(ap->xidDealloced, &xid, sizeof(xid), pages);
}
@ -117,22 +118,22 @@ inline static void insert_xidDealloced(stasis_allocation_policy_t * ap, int xid,
assert(check);
}
inline static void remove_xidDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
inline static void remove_xidDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
struct RB_ENTRY(tree) * pages = LH_ENTRY(find)(ap->xidDealloced, &xid, sizeof(xid));
assert(pages);
const availablePage * check = RB_ENTRY(delete)(p, pages);
assert(check == p);
}
inline static int find_xidDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
inline static int find_xidDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
struct RB_ENTRY(tree) * pages = LH_ENTRY(find)(ap->xidDealloced, &xid, sizeof(xid));
if(!pages) { return 0; }
if(!pages) { return 0; }
const availablePage * check = RB_ENTRY(find)(p, pages);
if(check) {
return 1;
} else {
return 0;
if(check) {
return 1;
} else {
return 0;
}
}
@ -149,7 +150,7 @@ inline static void lockAlloced(stasis_allocation_policy_t * ap, int xid, ava
insert_xidAlloced(ap, xid, p);
}
inline static void unlockAlloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
inline static void unlockAlloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
remove_xidAlloced(ap, xid, p);
p->lockCount--;
@ -165,15 +166,15 @@ inline static void unlockAlloced(stasis_allocation_policy_t * ap, int xid, ava
}
inline static void lockDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
if(p->lockCount == 0) {
inline static void lockDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
if(p->lockCount == 0) {
// xid should own it
lockAlloced(ap, xid, p);
} else if(p->lockCount == 1) {
int * xidp = LH_ENTRY(find)(ap->pageOwners, &(p->pageid), sizeof(p->pageid));
if(!xidp) {
// The only active transaction that touched this page deallocated from it,
if(!xidp) {
// The only active transaction that touched this page deallocated from it,
// so just add the page to our dealloced table.
p->lockCount++;
@ -196,21 +197,21 @@ inline static void lockDealloced(stasis_allocation_policy_t * ap, int xid, avail
p->lockCount++;
insert_xidDealloced(ap, xid, p);
}
} else {
// not owned by anyone... is it already in this xid's Dealloced table?
if(!find_xidDealloced(ap, xid, p)) {
} else {
// not owned by anyone... is it already in this xid's Dealloced table?
if(!find_xidDealloced(ap, xid, p)) {
p->lockCount++;
insert_xidDealloced(ap, xid, p);
}
}
}
inline static void unlockDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
inline static void unlockDealloced(stasis_allocation_policy_t * ap, int xid, availablePage * p) {
assert(p->lockCount > 0);
p->lockCount--;
remove_xidDealloced(ap, xid, p);
if(!p->lockCount) {
if(!p->lockCount) {
// put it back into available pages.
LH_ENTRY(remove)(ap->pageOwners, &(p->pageid), sizeof(p->pageid)); // XXX new feb-29
@ -219,7 +220,7 @@ inline static void unlockDealloced(stasis_allocation_policy_t * ap, int xid, ava
}
}
stasis_allocation_policy_t * stasis_allocation_policy_init() {
stasis_allocation_policy_t * stasis_allocation_policy_init() {
stasis_allocation_policy_t * ap = malloc(sizeof(stasis_allocation_policy_t));
ap->xidAlloced = LH_ENTRY(create)(10);
@ -230,10 +231,10 @@ stasis_allocation_policy_t * stasis_allocation_policy_init() {
return ap;
}
void stasis_allocation_policy_deinit(stasis_allocation_policy_t * ap) {
void stasis_allocation_policy_deinit(stasis_allocation_policy_t * ap) {
const availablePage * next;
while(( next = RB_ENTRY(min)(ap->availablePages) )) {
while(( next = RB_ENTRY(min)(ap->availablePages) )) {
RB_ENTRY(delete)(next, ap->availablePages);
free((void*)next);
}
@ -254,7 +255,7 @@ void stasis_allocation_policy_register_new_pages(stasis_allocation_policy_t * ap
LH_ENTRY(insert)(ap->allPages, &(newPages[i]->pageid), sizeof(newPages[i]->pageid), newPages[i]);
}
}
/// XXX need updateAlloced, updateFree, which remove a page, change
@ -263,10 +264,10 @@ void stasis_allocation_policy_register_new_pages(stasis_allocation_policy_t * ap
availablePage * stasis_allocation_policy_pick_suitable_page(stasis_allocation_policy_t * ap, int xid, int freespace) {
// For the best fit amongst the pages in availablePages, call:
//
// rblookup(RB_LUGREAT, key, availablePages)
//
// rblookup(RB_LUGREAT, key, availablePages)
//
// For the page with the most freespace, call:
//
//
// rbmax(availablePages);
//
availablePage tmp = { .freespace = freespace, .pageid = 0, .lockCount = 0 };
@ -275,21 +276,21 @@ availablePage * stasis_allocation_policy_pick_suitable_page(stasis_allocation_po
// If we haven't heard of this transaction yet, then create an entry
// for it.
if(0 == (locks = LH_ENTRY(find)(ap->xidAlloced, &xid, sizeof(xid)))) {
if(0 == (locks = LH_ENTRY(find)(ap->xidAlloced, &xid, sizeof(xid)))) {
// Since this is cmpPageid, we can change the amount of freespace
// without modifying the tree.
locks = RB_ENTRY(init)(cmpFreespace, 0);
locks = RB_ENTRY(init)(cmpFreespace, 0);
LH_ENTRY(insert)(ap->xidAlloced, &xid, sizeof(xid), locks);
}
}
const availablePage *ret;
const availablePage *ret;
// Does this transaction already have an appropriate page?
if(!(ret = RB_ENTRY(lookup)(RB_LUGREAT, &tmp, locks))) {
// No; get a page from the availablePages.
// No; get a page from the availablePages.
ret = RB_ENTRY(lookup)(RB_LUGREAT, &tmp, ap->availablePages);
/*if(ret) {
@ -299,7 +300,7 @@ availablePage * stasis_allocation_policy_pick_suitable_page(stasis_allocation_po
}
// Done. (If ret is null, then it's the caller's problem.)
return (availablePage*) ret;
return (availablePage*) ret;
}
int stasis_allocation_policy_can_xid_alloc_from_page(stasis_allocation_policy_t *ap, int xid, pageid_t page) {
@ -319,7 +320,7 @@ int stasis_allocation_policy_can_xid_alloc_from_page(stasis_allocation_policy_t
}
}
void stasis_allocation_policy_alloced_from_page(stasis_allocation_policy_t *ap, int xid, pageid_t page) {
void stasis_allocation_policy_alloced_from_page(stasis_allocation_policy_t *ap, int xid, pageid_t page) {
availablePage * p = getAvailablePage(ap, page);
const availablePage * check1 = RB_ENTRY(find)(p, ap->availablePages);
int * xidp = LH_ENTRY(find)(ap->pageOwners, &(page), sizeof(page));
@ -343,7 +344,7 @@ void stasis_allocation_policy_alloced_from_page(stasis_allocation_policy_t *ap,
}
}
void stasis_allocation_policy_lock_page(stasis_allocation_policy_t *ap, int xid, pageid_t page) {
void stasis_allocation_policy_lock_page(stasis_allocation_policy_t *ap, int xid, pageid_t page) {
availablePage * p = getAvailablePage(ap, page);
lockDealloced(ap, xid, p);
@ -359,7 +360,7 @@ void stasis_allocation_policy_transaction_completed(stasis_allocation_policy_t *
availablePage * next;
while(( next = (void*)RB_ENTRY(min)(locks) )) {
while(( next = (void*)RB_ENTRY(min)(locks) )) {
unlockAlloced(ap, xid, (next)); // This is really inefficient. (We're wasting hashtable lookups. Also, an iterator would be faster.)
}
@ -372,8 +373,8 @@ void stasis_allocation_policy_transaction_completed(stasis_allocation_policy_t *
if(locks) {
availablePage * next;
while(( next = (void*)RB_ENTRY(min)(locks) )) {
while(( next = (void*)RB_ENTRY(min)(locks) )) {
unlockDealloced(ap, xid, (availablePage*)next); // This is really inefficient. (We're wasting hashtable lookups. Also, an iterator would be faster.)
}

View file

@ -1,12 +1,11 @@
#include <stasis/common.h>
#include <stasis/transactional.h>
#include <stdlib.h>
#include <stdio.h>
typedef struct {
typedef struct {
byte * keyArray;
byte * valueArray;
unsigned int element;
unsigned int keySize;
unsigned int valueSize;
unsigned int keySize;
unsigned int valueSize;
unsigned int elementCount;
} array_iterator_t;
@ -55,14 +54,14 @@ int arrayIterator_value(int xid, void * impl, byte ** value) {
}
typedef struct {
typedef struct {
byte * array;
int element;
int elementSize;
int elementSize;
int elementCount;
} pointer_array_iterator_t;
/*lladdIterator_t * ptrArray_iterator(byte * array, int elementCount) {
}*/

View file

@ -1,5 +1,3 @@
#include <pthread.h>
#include <config.h>
#include <stasis/bufferManager/bufferHash.h>
#include <stasis/bufferPool.h>
@ -11,7 +9,9 @@
#include <stasis/replacementPolicy.h>
#include <stasis/bufferManager.h>
#include <stasis/page.h>
#include <assert.h>
#include <stdio.h>
//#define LATCH_SANITY_CHECKING

View file

@ -1,5 +1,3 @@
#include <string.h>
#include <stasis/common.h>
#include <stasis/bufferManager.h>
@ -12,6 +10,8 @@
#include <stasis/lhtable.h>
#include <stdio.h>
static struct LH_ENTRY(table) *activePages; /* page lookup */
static pthread_mutex_t loadPagePtr_mutex;
static Page * dummy_page;

View file

@ -14,16 +14,14 @@
#include <stasis/logger/logger2.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <stdlib.h>
/** For O_DIRECT. It's unclear that this is the correct thing to \#define, but it works under linux. */
#define __USE_GNU
#include <stdio.h>
#include <fcntl.h>
#include <unistd.h>
/** Allows boostrapping of the header page. */
/** Allows bootstrapping of the header page. */
#include <stasis/operations/pageOperations.h>
static int stable = -1;

View file

@ -1,5 +1,3 @@
#include <stdlib.h>
#include <config.h>
#include <stasis/transactional.h>
#include <stasis/bufferManager.h>
#include <stasis/bufferPool.h>

View file

@ -3,15 +3,14 @@
int ___compensation_count___ = 0;
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
static pthread_key_t error_key;
void compensations_init () {
int ret = pthread_key_create(&error_key, NULL);
assert(!ret);
pthread_setspecific(error_key, NULL);
assert(!ret);
pthread_setspecific(error_key, NULL);
}
void compensations_deinit() {

View file

@ -1,6 +1,5 @@
#include <stasis/consumer.h>
#include <assert.h>
#include <stdlib.h>
#include <stasis/logger/logMemory.h>
static lladdConsumer_def_t consumers[MAX_CONSUMER_TYPES];
@ -10,7 +9,7 @@ static void lladdConsumer_register(int type, lladdConsumer_def_t info) {
consumers[type] = info;
}
void consumer_init() {
void consumer_init() {
lladdConsumer_def_t logMemory_def = {
logMemory_consumer_push,
logMemory_consumer_close

View file

@ -1,7 +1,5 @@
// Calculate a CRC 32 checksum.
#include <stasis/crc32.h> /*Added 10-6-04 */
#include <stdlib.h>
#include <stdio.h>
// LAST MODIFIED:[7-28-93]

View file

@ -1,6 +1,4 @@
#include <config.h>
#include <stdlib.h>
#include <string.h>
#include <stasis/common.h>
#include <stasis/doubleLinkedList.h>
#include <assert.h>
@ -12,7 +10,7 @@ list * LL_ENTRY(create)() {
list* ret = malloc(sizeof(list));
// bypass const annotation on head, tail...
list tmp = {
list tmp = {
malloc(sizeof(node_t)),
malloc(sizeof(node_t))
};
@ -25,16 +23,16 @@ list * LL_ENTRY(create)() {
return ret;
}
void LL_ENTRY(destroy)(list* l) {
void LL_ENTRY(destroy)(list* l) {
value_t * n;
while((n = LL_ENTRY(pop)(l))) {
while((n = LL_ENTRY(pop)(l))) {
// nop
}
free(l->head);
free(l->tail);
free(l);
}
node_t * LL_ENTRY(push)(list* l, value_t * v) {
node_t * LL_ENTRY(push)(list* l, value_t * v) {
node_t * n = malloc(sizeof(node_t));
n->v = v;
LL_ENTRY(pushNode)(l, n);
@ -42,15 +40,15 @@ node_t * LL_ENTRY(push)(list* l, value_t * v) {
}
value_t* LL_ENTRY(pop) (list* l) {
node_t * n = LL_ENTRY(popNode)(l);
if(n) {
if(n) {
value_t * v = n->v;
free(n);
return v;
} else {
} else {
return 0;
}
}
node_t * LL_ENTRY(unshift)(list* l, value_t * v) {
node_t * LL_ENTRY(unshift)(list* l, value_t * v) {
node_t * n = malloc(sizeof(node_t));
n->v = v;
LL_ENTRY(unshiftNode)(l, n);
@ -58,17 +56,17 @@ node_t * LL_ENTRY(unshift)(list* l, value_t * v) {
}
value_t * LL_ENTRY(shift) (list* l) {
node_t * n = LL_ENTRY(shiftNode)(l);
if(n) {
if(n) {
value_t * v = n->v;
free(n);
return v;
} else {
} else {
return 0;
}
}
void LL_ENTRY(pushNode)(list* l, node_t * n) {
void LL_ENTRY(pushNode)(list* l, node_t * n) {
// Need to update 3 nodes: n , tail, tail->prev
@ -85,7 +83,7 @@ void LL_ENTRY(pushNode)(list* l, node_t * n) {
node_t* LL_ENTRY(popNode) (list* l) {
node_t * n = l->tail->prev;
assert(n != l->tail);
if(n != l->head) {
if(n != l->head) {
assert(n->prev != 0);
assert(n->next == l->tail);
@ -96,12 +94,12 @@ node_t* LL_ENTRY(popNode) (list* l) {
l->tail->prev = n->prev;
return n;
} else {
} else {
assert(n->prev == 0);
return 0;
}
}
void LL_ENTRY(unshiftNode)(list* l, node_t * n) {
void LL_ENTRY(unshiftNode)(list* l, node_t * n) {
// n
n->prev = l->head;
@ -116,27 +114,27 @@ void LL_ENTRY(unshiftNode)(list* l, node_t * n) {
node_t * LL_ENTRY(shiftNode) (list* l) {
node_t * n = l->head->next;
assert(n != l->head);
if(n != l->tail) {
if(n != l->tail) {
assert(n->next != 0);
assert(n->prev == l->head);
// n->next
n->next->prev = n->prev;
// head
l->head->next = n->next;
return n;
} else {
} else {
assert(n->next == 0);
return 0;
}
}
void LL_ENTRY(remove)(list * l, node_t * n) {
void LL_ENTRY(remove)(list * l, node_t * n) {
LL_ENTRY(removeNoFree)(l,n);
free(n);
}
void LL_ENTRY(removeNoFree)(list * l, node_t * n) {
void LL_ENTRY(removeNoFree)(list * l, node_t * n) {
assert(n != l->head);
assert(n != l->tail);
assert(n->next != n);

View file

@ -1,14 +1,11 @@
#include <stasis/fifo.h>
#include <stasis/crc32.h>
#include <stdlib.h>
#include <stdio.h>
#include <stasis/logger/logMemory.h>
#include <string.h>
#include <assert.h>
/**
Obtain a member of a fifoPool based on the value of multiplexKey. Use CRC32 to assign the key to a consumer.
#include <stdio.h>
/**
Obtain a member of a fifoPool based on the value of multiplexKey. Use CRC32 to assign the key to a consumer.
*/
lladdFifo_t * lladdFifoPool_getFifoCRC32( lladdFifoPool_t * pool, byte * multiplexKey, size_t multiplexKeySize) {
int memberId = stasis_crc32(multiplexKey, multiplexKeySize, (unsigned int)-1) % pool->fifoCount;
@ -28,7 +25,7 @@ void lladdFifoPool_markDirty(int xid, lladdFifoPool_t * pool, lladdFifo_t * fifo
consumerCount is the number of consumers in the pool.
@todo this function should be generalized to other consumer implementations.
*/
lladdFifoPool_t * lladdFifoPool_ringBufferInit (int consumerCount, int bufferSize,
lladdFifoPool_t * lladdFifoPool_ringBufferInit (int consumerCount, int bufferSize,
lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo) {
lladdFifoPool_t * pool = malloc(sizeof(lladdFifoPool_t));
@ -44,7 +41,7 @@ lladdFifoPool_t * lladdFifoPool_ringBufferInit (int consumerCount, int bufferSiz
return pool;
}
typedef struct {
typedef struct {
int maxPtrs;
int outPtrs;
pthread_mutex_t mutex;
@ -58,7 +55,7 @@ typedef struct pointerFifoEntry {
int valSize;
} pointerFifoEntry;
typedef struct {
typedef struct {
pointerFifoEntry * first;
pointerFifoEntry * last;
pointerFifoEntry * current;
@ -104,13 +101,13 @@ int lladdFifoPool_iterator_next(int xid, void * it) {
}
int lladdFifoPool_iterator_tryNext(int xid, void * it) {
pointerFifoImpl * impl = (pointerFifoImpl *) it;
pthread_mutex_lock(&(impl->mutex));
if(impl->last == NULL) {
pthread_mutex_unlock(&(impl->mutex));
return 0;
} else {
} else {
doNext(xid, impl);
}
@ -152,10 +149,10 @@ void lladdFifoPool_iterator_close(int xid, void * it) {
pthread_mutex_lock(&(impl->mutex));
assert(impl->eof);
assert((!impl->first) && (!impl->last));
if(firstWarn) {
if(firstWarn) {
printf("Leaking iterators in lladdFifoPool_iterator_close\n");
firstWarn = 0;
}
}
pthread_mutex_unlock(&(impl->mutex));
}
@ -184,12 +181,12 @@ int lladdFifoPool_consumer_push(int xid, void * it, byte * key, size_t keySize,
memcpy(((byte*)(entry+1))+keySize, val, valSize);
entry->keySize = keySize;
entry->valSize = valSize;
pthread_mutex_lock(&(impl->mutex));
entry->next = impl->first;
if(impl->last == NULL) {
impl->last = entry;
if(impl->last == NULL) {
impl->last = entry;
assert(!impl->first);
} else {
assert(impl->first);
@ -201,14 +198,14 @@ int lladdFifoPool_consumer_push(int xid, void * it, byte * key, size_t keySize,
pthread_cond_broadcast(&(impl->readOK));
pthread_mutex_unlock(&(impl->mutex));
return 0;
}
lladdFifoPool_t * lladdFifoPool_pointerPoolInit (int consumerCount, int pointerCount,
lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo) {
lladdFifoPool_t * lladdFifoPool_pointerPoolInit (int consumerCount, int pointerCount,
lladdFifoPool_getFifo_t * getFifo, lladdFifo_t * dirtyPoolFifo) {
lladdFifoPool_t * pool = malloc(sizeof(lladdFifoPool_t));
pool->pool = malloc(sizeof(lladdFifo_t*) * consumerCount);
@ -226,7 +223,7 @@ lladdFifoPool_t * lladdFifoPool_pointerPoolInit (int consumerCount, int pointerC
pool->pool[i]->consumer = malloc(sizeof(lladdConsumer_t));
pool->pool[i]->iterator->type = POINTER_ITERATOR;
pool->pool[i]->consumer->type = POINTER_CONSUMER;
pointerFifoImpl * impl =
pointerFifoImpl * impl =
(pointerFifoImpl*) (pool->pool[i]->consumer->impl = pool->pool[i]->iterator->impl = malloc(sizeof(pointerFifoImpl)));
impl->first = NULL;
impl->last = NULL;

View file

@ -1,9 +1,7 @@
#include <stasis/graph.h>
#include <alloca.h>
#include <assert.h>
#include <stdlib.h>
#include <stasis/page.h>
#include <stasis/crc32.h>
#include <assert.h>
int numOut = 0;
int numTset = 0;
@ -39,12 +37,12 @@ pthread_mutex_t counters = PTHREAD_MUTEX_INITIALIZER;
/** @todo need to load the correct pages, since the local fifo doesn't refer to a single page!!! */
void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t * global, lladdFifoPool_t * pool, int num) {
int * node = alloca(sizeof(int) * (transClos_outdegree+1));
int * nodeScratch = alloca(sizeof(int) * (transClos_outdegree+1));
int myFifo = -1;
int deltaNumOut = 0;
int deltaNumSkipped = 0;
int deltaNumShortcutted = 0;
@ -54,7 +52,7 @@ void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t
byte * brid;
recordid localRid;
size_t size = Titerator_value(xid, local->iterator, &brid);
assert(size == sizeof(recordid));
recordid * rid = (recordid*)brid;
localRid = *rid;
@ -62,11 +60,11 @@ void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t
if(myFifo == -1) {
if(useCRC) {
myFifo = stasis_crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned int)-1) % pool->fifoCount;
} else {
} else {
myFifo = rid->page % pool->fifoCount;
}
// printf("Switched locality sets... %d\n", myFifo);
} else {
} else {
// assert(myFifo == crc32((byte*)&(rid->page), sizeof(rid->page), (unsigned long)-1L) % pool->fifoCount);
}
@ -80,11 +78,11 @@ void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t
numTset++;
Tset(xid, localRid, (const byte*)node); /// @todo TsetRange?
int i;
for(i =0 ; i < transClos_outdegree; i++) {
for(i =0 ; i < transClos_outdegree; i++) {
recordid nextRid = arrayList;
nextRid.slot = node[i];
Page * p = loadPage(xid, arrayList.page); // just pin it forever and ever
nextRid = stasis_array_list_dereference_recordid(xid, p, nextRid.slot);
nextRid = stasis_array_list_dereference_recordid(xid, p, nextRid.slot);
releasePage(p);
int thisFifo = stasis_crc32((byte*)&(nextRid.page), sizeof(nextRid.page), (unsigned int)-1) % pool->fifoCount;
@ -100,7 +98,7 @@ void multiTraverse(int xid, recordid arrayList, lladdFifo_t * local, lladdFifo_t
deltaNumOut++;
} else {
deltaNumSkipped++;
}
}
} else {
// @todo check nextRid to see if we're the worker that will consume it, or (easier) if it stays on the same page.
Tconsumer_push(xid, global->consumer, NULL, 0, (byte*)&nextRid, sizeof(recordid));

View file

@ -1,6 +1,5 @@
#include <stasis/io/handle.h>
#include <stdio.h>
#include <stdlib.h>
/** @file

View file

@ -1,20 +1,10 @@
#include <config.h>
#ifdef HAVE_SYNC_FILE_RANGE
#define _GNU_SOURCE
#endif
#include <fcntl.h>
#include <stasis/common.h>
#include <stasis/io/handle.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
/** @file */

View file

@ -1,8 +1,4 @@
#include <stasis/io/handle.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
/** @file */
typedef struct mem_impl {
@ -25,25 +21,25 @@ static int mem_close(stasis_handle_t * h) {
static lsn_t mem_start_position(stasis_handle_t *h) {
lsn_t ret;
mem_impl* impl = (mem_impl*)(h->impl);
pthread_mutex_lock(&impl->mut);
ret = impl->start_pos;
pthread_mutex_unlock(&impl->mut);
return ret;
}
static lsn_t mem_end_position(stasis_handle_t *h) {
static lsn_t mem_end_position(stasis_handle_t *h) {
lsn_t ret;
mem_impl* impl = (mem_impl*)(h->impl);
pthread_mutex_lock(&impl->mut);
ret = impl->end_pos;
pthread_mutex_unlock(&impl->mut);
return ret;
}
static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h,
lsn_t off, lsn_t len) {
static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h,
lsn_t off, lsn_t len) {
mem_impl* impl = (mem_impl*)(h->impl);
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
@ -53,13 +49,13 @@ static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h,
int error = 0;
if(impl->start_pos > off) {
if(impl->start_pos > off) {
error = EDOM;
} else if(impl->end_pos > off+len) {
} else if(impl->end_pos > off+len) {
// Just need to return buffer; h's state is unchanged.
} else {
} else {
byte * newbuf;
if(off+len-impl->start_pos) {
if(off+len-impl->start_pos) {
newbuf = realloc(impl->buf, off+len - impl->start_pos);
} else {
free(impl->buf);
@ -68,19 +64,19 @@ static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h,
if(newbuf) {
impl->buf = newbuf;
impl->end_pos = off+len;
} else {
} else {
error = ENOMEM;
}
}
if(error) {
if(error) {
ret->h = h;
ret->off = 0;
ret->buf = 0;
ret->len = 0;
ret->impl = 0;
ret->error = error;
} else {
} else {
ret->h = h;
ret->off = off;
ret->buf = &(impl->buf[off-impl->start_pos]);
@ -92,7 +88,7 @@ static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h,
return ret;
}
static stasis_write_buffer_t * mem_append_buffer(stasis_handle_t * h,
static stasis_write_buffer_t * mem_append_buffer(stasis_handle_t * h,
lsn_t len) {
mem_impl * impl = (mem_impl*)(h->impl);
@ -108,19 +104,19 @@ static stasis_write_buffer_t * mem_append_buffer(stasis_handle_t * h,
if(newlen == 0) {
free(impl->buf);
newbuf = malloc(0);
} else {
} else {
newbuf = realloc(impl->buf, impl->end_pos - impl->start_pos);
}
if(newbuf) {
if(newbuf) {
impl->buf = newbuf;
ret->h = h;
ret->off = off;
ret->buf = &(impl->buf[off-impl->start_pos]);
ret->len = len;
ret->impl = 0;
ret->error = 0;
} else {
} else {
// if we requested a zero length buffer, this is OK.
ret->h = h;
ret->off = 0;
@ -131,7 +127,7 @@ static stasis_write_buffer_t * mem_append_buffer(stasis_handle_t * h,
}
return ret;
}
static int mem_release_write_buffer(stasis_write_buffer_t * w) {
static int mem_release_write_buffer(stasis_write_buffer_t * w) {
mem_impl * impl = (mem_impl*)(w->h->impl);
pthread_mutex_unlock(&(impl->mut));
free(w);
@ -139,14 +135,14 @@ static int mem_release_write_buffer(stasis_write_buffer_t * w) {
}
static stasis_read_buffer_t * mem_read_buffer(stasis_handle_t * h,
lsn_t off, lsn_t len) {
lsn_t off, lsn_t len) {
mem_impl * impl = (mem_impl*)(h->impl);
pthread_mutex_lock(&(impl->mut));
stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t));
if(!ret) { return NULL; }
if(off < impl->start_pos || off + len > impl->end_pos) {
if(off < impl->start_pos || off + len > impl->end_pos) {
ret->h = h;
ret->buf = 0;
ret->len = 0;
@ -163,34 +159,34 @@ static stasis_read_buffer_t * mem_read_buffer(stasis_handle_t * h,
}
return ret;
}
static int mem_release_read_buffer(stasis_read_buffer_t * r) {
static int mem_release_read_buffer(stasis_read_buffer_t * r) {
mem_impl * impl = (mem_impl*)(r->h->impl);
pthread_mutex_unlock(&(impl->mut));
free(r);
return 0;
}
static int mem_write(stasis_handle_t * h, lsn_t off,
const byte * dat, lsn_t len) {
static int mem_write(stasis_handle_t * h, lsn_t off,
const byte * dat, lsn_t len) {
// Overlapping writes aren't atomic; no latch needed.
stasis_write_buffer_t * w = mem_write_buffer(h, off, len);
int ret;
if(w->error) {
if(w->error) {
ret = w->error;
} else {
} else {
memcpy(w->buf, dat, len);
ret = 0;
}
}
mem_release_write_buffer(w);
return ret;
}
static int mem_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) {
static int mem_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) {
stasis_write_buffer_t * w = mem_append_buffer(h, len);
int ret;
if(w->error) {
if(w->error) {
ret = w->error;
} else {
} else {
memcpy(w->buf, dat, len);
ret = 0;
}
@ -199,16 +195,16 @@ static int mem_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t
return ret;
}
static int mem_read(stasis_handle_t * h,
lsn_t off, byte * buf, lsn_t len) {
static int mem_read(stasis_handle_t * h,
lsn_t off, byte * buf, lsn_t len) {
stasis_read_buffer_t * r = mem_read_buffer(h, off, len);
int ret;
if(r->error) {
if(r->error) {
ret = r->error;
} else {
} else {
memcpy(buf, r->buf, len);
ret = 0;
}
}
mem_release_read_buffer(r);
return ret;
}
@ -218,20 +214,20 @@ static int mem_force(stasis_handle_t *h) {
static int mem_force_range(stasis_handle_t *h,lsn_t start, lsn_t stop) {
return 0;
}
static int mem_truncate_start(stasis_handle_t * h, lsn_t new_start) {
static int mem_truncate_start(stasis_handle_t * h, lsn_t new_start) {
mem_impl* impl = (mem_impl*) h->impl;
pthread_mutex_lock(&(impl->mut));
if(new_start < impl->start_pos) {
if(new_start < impl->start_pos) {
pthread_mutex_unlock(&impl->mut);
return 0;
}
if(new_start > impl->end_pos) {
}
if(new_start > impl->end_pos) {
pthread_mutex_unlock(&impl->mut);
return EDOM;
}
byte * new_buf = malloc(impl->end_pos -new_start);
memcpy(new_buf, &(impl->buf[new_start - impl->start_pos]), impl->end_pos - new_start);
free(impl->buf);
@ -274,6 +270,6 @@ stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset) {
impl->start_pos = start_offset;
impl->end_pos = start_offset;
impl->buf = malloc(0);
return ret;
}

View file

@ -1,15 +1,11 @@
#include "config.h"
#include <stasis/common.h>
#include <stasis/constants.h>
#include <stasis/io/handle.h>
#include <stasis/linkedlist.h>
#include <stasis/redblack.h>
#include <pthread.h>
#include <errno.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
/**

View file

@ -1,8 +1,5 @@
#include <config.h>
#define _XOPEN_SOURCE 500
#ifdef HAVE_SYNC_FILE_RANGE
#define _GNU_SOURCE
#endif
#include <fcntl.h>
#include <unistd.h>

View file

@ -1,35 +1,36 @@
#define _GNU_SOURCE
#include <stdio.h>
#include <stasis/common.h>
#include <stasis/io/rangeTracker.h>
#include <stasis/redblack.h>
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
struct rangeTracker {
struct rangeTracker {
struct RB_ENTRY(tree)* ranges;
int quantization;
};
static int cmp_transition(const void * a, const void * b, const void * arg) {
static int cmp_transition(const void * a, const void * b, const void * arg) {
const transition * ta = a;
const transition * tb = b;
return ta->pos - tb->pos;
}
rangeTracker * rangeTrackerInit(int quantization) {
rangeTracker * rangeTrackerInit(int quantization) {
rangeTracker * ret = malloc(sizeof(rangeTracker));
ret->ranges = RB_ENTRY(init)(cmp_transition, 0);
ret->quantization = quantization;
return ret;
}
void rangeTrackerDeinit(rangeTracker * rt) {
void rangeTrackerDeinit(rangeTracker * rt) {
RBLIST * l = RB_ENTRY(openlist)(rt->ranges);
const transition * t;
while((t = RB_ENTRY(readlist)(l))) {
while((t = RB_ENTRY(readlist)(l))) {
RB_ENTRY(delete)(t, rt->ranges);
fprintf(stderr, "WARNING: Detected leaked range in rangeTracker!\n");
// Discard const to free t
@ -40,10 +41,10 @@ void rangeTrackerDeinit(rangeTracker * rt) {
free(rt);
}
static void rangeTrackerDelta(rangeTracker * rt, const range * r, int delta) {
static void rangeTrackerDelta(rangeTracker * rt, const range * r, int delta) {
// Abort on no-op requests.
assert(delta);
assert(delta);
assert(r->start < r->stop);
@ -58,7 +59,7 @@ static void rangeTrackerDelta(rangeTracker * rt, const range * r, int delta) {
assert(t->delta);
assert(t->pins >= 0);
assert(t->pins + t->delta >= 0);
if(t->pos != r->start) {
if(t->pos != r->start) {
int newpins = t->pins + t->delta;
t = malloc(sizeof(transition));
t->pos = r->start;
@ -67,11 +68,11 @@ static void rangeTrackerDelta(rangeTracker * rt, const range * r, int delta) {
assert(newpins >= 0);
RB_ENTRY(search)(t, rt->ranges); // insert
curpin = t->pins + t->delta;
} else {
} else {
t->delta += delta;
curpin = t->pins + t->delta;
assert(curpin >= 0);
if(t->delta == 0) {
if(t->delta == 0) {
RB_ENTRY(delete)(t, rt->ranges);
key.pos = t->pos;
free(t);
@ -79,11 +80,11 @@ static void rangeTrackerDelta(rangeTracker * rt, const range * r, int delta) {
t = &key;
}
}
} else {
} else {
t = malloc(sizeof(transition));
t->pos = r->start;
t->delta = delta;
t->pins = 0;
t->pins = 0;
RB_ENTRY(search)(t, rt->ranges); // insert
curpin = t->pins + t->delta;
assert(curpin >= 0);
@ -95,9 +96,9 @@ static void rangeTrackerDelta(rangeTracker * rt, const range * r, int delta) {
// libredblack does not provide a traversal function that starts at
// a particular point in the tree...
// Discarding const.
while((t = (transition *) rblookup(RB_LUGREAT, t, rt->ranges)) && t->pos < r->stop) {
while((t = (transition *) rblookup(RB_LUGREAT, t, rt->ranges)) && t->pos < r->stop) {
assert(t);
t->pins += delta;
assert(t->delta);
@ -105,7 +106,7 @@ static void rangeTrackerDelta(rangeTracker * rt, const range * r, int delta) {
curpin = t->pins + t->delta;
assert(curpin >= 0);
}
if(!t || t->pos != r->stop) {
if(!t || t->pos != r->stop) {
// Need to allocate new transition
t = malloc(sizeof(transition));
t->pos = r->stop;
@ -113,22 +114,22 @@ static void rangeTrackerDelta(rangeTracker * rt, const range * r, int delta) {
t->pins = curpin;
assert(curpin >= 0);
RB_ENTRY(search)(t, rt->ranges); // insert
} else {
} else {
// Found existing transition at end of range.
assert(t->pos == r->stop);
t->pins += delta;
assert(t->pins == curpin);
t->delta -= delta;
if(t->delta == 0) {
if(t->delta == 0) {
RB_ENTRY(delete)(t, rt->ranges);
free(t);
}
}
}
static range ** rangeTrackerToArray(rangeTracker * rt) {
static range ** rangeTrackerToArray(rangeTracker * rt) {
// count ranges.
int range_count = 0;
const transition * t;
@ -136,16 +137,16 @@ static range ** rangeTrackerToArray(rangeTracker * rt) {
RBLIST * list = RB_ENTRY(openlist) (rt->ranges);
while((t = RB_ENTRY(readlist)(list))) {
if(!(t->pins + t->delta)) {
if(!(t->pins + t->delta)) {
// end of a range.
in_range = 0;
range_count++;
} else {
} else {
in_range = 1;
}
}
RB_ENTRY(closelist)(list);
if(in_range) {
if(in_range) {
range_count++;
}
@ -155,11 +156,11 @@ static range ** rangeTrackerToArray(rangeTracker * rt) {
in_range = 0;
list = RB_ENTRY(openlist) (rt->ranges);
t = RB_ENTRY(readlist)(list);
if(!t) {
if(!t) {
assert(range_count == 0);
RB_ENTRY(closelist)(list);
return ret;
} else {
} else {
assert(!t->pins);
assert(t->delta);
assert(! ret[next_range] );
@ -167,15 +168,15 @@ static range ** rangeTrackerToArray(rangeTracker * rt) {
ret[next_range]->start = t->pos;
in_range = 1;
}
while((t = RB_ENTRY(readlist)(list))) {
if(t->pins + t->delta) {
if(!in_range) {
while((t = RB_ENTRY(readlist)(list))) {
if(t->pins + t->delta) {
if(!in_range) {
assert(! ret[next_range]);
ret[next_range] = malloc(sizeof(range));
ret[next_range]->start = t->pos;
in_range = 1;
}
} else {
} else {
// end of range.
assert(in_range);
ret[next_range]->stop = t->pos;
@ -186,15 +187,15 @@ static range ** rangeTrackerToArray(rangeTracker * rt) {
RB_ENTRY(closelist)(list);
assert(next_range == range_count);
return ret;
}
static void pinnedRanges(const rangeTracker * rt, const range * request, rangeTracker * ret, int delta) {
static void pinnedRanges(const rangeTracker * rt, const range * request, rangeTracker * ret, int delta) {
transition key;
// we will start looking at the tree at the first transition after key.pos.
key.pos = rangeTrackerRoundDown(request->start, rt->quantization);
key.pos = rangeTrackerRoundDown(request->start, rt->quantization);
const transition * t = &key;
int in_range = 0; // 0 if the last transition marked the end of a range.
@ -205,63 +206,63 @@ static void pinnedRanges(const rangeTracker * rt, const range * request, rangeTr
expanded_range.start = rangeTrackerRoundDown(request->start, rt->quantization);
expanded_range.stop = rangeTrackerRoundUp(request->stop, rt->quantization);
while((t = rblookup(RB_LUGREAT, t, rt->ranges))) {
while((t = rblookup(RB_LUGREAT, t, rt->ranges))) {
assert(t->delta);
if(t->pos >= expanded_range.stop) {
if(in_range) {
if(t->pos >= expanded_range.stop) {
if(in_range) {
assert(t->pins);
assert(have_range);
cur.stop = expanded_range.stop;
assert(cur.stop != cur.start);
in_range = 0;
} else {
if(!have_range) {
} else {
if(!have_range) {
// we are at first transition
if(t->pins) {
if(t->pins) {
cur = expanded_range;
have_range = 1;
} else {
} else {
// no ranges are pinned.
}
}
}
// Pretend we hit the end of the tree.
break;
} else {
if(in_range) {
} else {
if(in_range) {
assert(have_range);
assert(t->pins);
if(!(t->pins + t->delta)) {
if(!(t->pins + t->delta)) {
cur.stop = rangeTrackerRoundUp(t->pos, rt->quantization);
assert(cur.stop != cur.start);
in_range = 0;
} else {
assert(in_range);
}
} else {
} else {
// not in range.
if(!have_range) {
if(!have_range) {
// we are at first transition
if(t->pins) {
if(t->pins) {
cur.start = expanded_range.start;
in_range = t->pins + t->delta;
if(! in_range) {
if(! in_range) {
cur.stop = rangeTrackerRoundUp(t->pos, rt->quantization);
assert(cur.stop != cur.start);
}
} else {
} else {
cur.start = rangeTrackerRoundDown(t->pos, rt->quantization);
in_range = 1;
assert(t->pins + t->delta);
}
have_range = 1;
} else {
} else {
assert(! t->pins);
assert(t->pins + t->delta);
// do we need to merge this transition with the range, or output the old range?
if(cur.stop >= rangeTrackerRoundDown(t->pos, rt->quantization)) {
if(cur.stop >= rangeTrackerRoundDown(t->pos, rt->quantization)) {
// do nothing; start position doesn't change.
} else {
} else {
// output old range, reset start position
rangeTrackerDelta(ret, &cur, delta);
cur.start = rangeTrackerRoundDown(t->pos, rt->quantization);
@ -273,25 +274,25 @@ static void pinnedRanges(const rangeTracker * rt, const range * request, rangeTr
}
assert(!in_range);
if(have_range) {
if(have_range) {
rangeTrackerDelta(ret, &cur, delta);
}
}
range ** rangeTrackerAdd(rangeTracker * rt, const range * rng) {
range ** rangeTrackerAdd(rangeTracker * rt, const range * rng) {
rangeTracker * ret = rangeTrackerInit(rt->quantization);
pinnedRanges(rt, rng, ret, 1);
rangeTrackerDelta(rt, rng, 1);
rangeTracker * ret2 = rangeTrackerInit(rt->quantization);
pinnedRanges(rt, rng, ret2, 1);
range ** ret_arry = rangeTrackerToArray(ret);
// remove the first array from the second...
int i = 0;
while(ret_arry[i]) {
while(ret_arry[i]) {
rangeTrackerDelta(ret2, ret_arry[i], -1);
// while we're at it, deinit the first range tracker
rangeTrackerDelta(ret, ret_arry[i], -1);
@ -300,11 +301,11 @@ range ** rangeTrackerAdd(rangeTracker * rt, const range * rng) {
}
free(ret_arry);
rangeTrackerDeinit(ret);
i = 0;
ret_arry = rangeTrackerToArray(ret2);
while(ret_arry[i]) {
while(ret_arry[i]) {
rangeTrackerDelta(ret2, ret_arry[i], -1);
i++;
}
@ -313,12 +314,12 @@ range ** rangeTrackerAdd(rangeTracker * rt, const range * rng) {
return ret_arry;
}
/**
/**
Remove a range
@return a null terminated array of newly-unpinned, quantized ranges
*/
range ** rangeTrackerRemove(rangeTracker * rt, const range * rang) {
range ** rangeTrackerRemove(rangeTracker * rt, const range * rang) {
rangeTracker * ret = rangeTrackerInit(rt->quantization);
pinnedRanges(rt, rang, ret, 1);
rangeTrackerDelta(rt, rang, -1);
@ -327,7 +328,7 @@ range ** rangeTrackerRemove(rangeTracker * rt, const range * rang) {
range ** ret_arry = rangeTrackerToArray(ret);
int i = 0;
while(ret_arry[i]) {
while(ret_arry[i]) {
rangeTrackerDelta(ret, ret_arry[i], -1);
i++;
}
@ -341,7 +342,7 @@ const transition ** rangeTrackerEnumerate(rangeTracker * rt) {
int transitionCount = 0;
const transition * t;
RBLIST * list = RB_ENTRY(openlist) (rt->ranges);
while((t = RB_ENTRY(readlist)(list))) {
while((t = RB_ENTRY(readlist)(list))) {
transitionCount++;
}
RB_ENTRY(closelist)(list);
@ -351,7 +352,7 @@ const transition ** rangeTrackerEnumerate(rangeTracker * rt) {
list = RB_ENTRY(openlist) (rt->ranges);
int i = 0;
while((t = RB_ENTRY(readlist)(list))) {
while((t = RB_ENTRY(readlist)(list))) {
ret[i] = t;
i++;
}
@ -360,14 +361,14 @@ const transition ** rangeTrackerEnumerate(rangeTracker * rt) {
return ret;
}
char * rangeToString(const range * r) {
char * rangeToString(const range * r) {
char * ret;
int err = asprintf(&ret, "[range %lld-%lld]", (long long)r->start, (long long)r->stop);
assert(err !=-1);
return ret;
}
char * transitionToString(const transition * t) {
char * transitionToString(const transition * t) {
char * ret;
int err = asprintf(&ret, "[transition pos=%lld delta=%d pins=%d]", (long long)t->pos, t->delta, t->pins);
assert(err !=-1);

View file

@ -1,12 +1,11 @@
#include <stasis/iterator.h>
#include <assert.h>
#include <stdlib.h>
#include <stasis/operations.h>
#include <stasis/arrayCollection.h>
#include <stasis/logger/logMemory.h>
#include <stasis/operations/group.h>
#include <assert.h>
static lladdIterator_def_t iterators[MAX_ITERATOR_TYPES];
void lladdIterator_register(int type, lladdIterator_def_t info) {

View file

@ -1,12 +1,10 @@
#include <stdlib.h>
#include <stasis/lhtable.h>
#include <stasis/hash.h>
#include <pbl/pbl.h>
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <stasis/latches.h>
#include <assert.h>
#define FILL_FACTOR (0.5)
//#define MEASURE_GLOBAL_BUCKET_LENGTH
@ -21,9 +19,9 @@ static pthread_mutex_t stat_mutex = PTHREAD_MUTEX_INITIALIZER;
@file
In-memory hashtable implementation. It uses linear hashing
to incrementally grow the bucket list.
In-memory hashtable implementation. It uses linear hashing
to incrementally grow the bucket list.
Functions that end with "_r" are reentrant; those that do not are
not.
@ -48,14 +46,14 @@ struct LH_ENTRY(table) {
//===================================================== Static helper functions
static struct LH_ENTRY(pair_t) *
findInLinkedList(const void * key, int len,
struct LH_ENTRY(pair_t)* list,
struct LH_ENTRY(pair_t)** predecessor) {
static struct LH_ENTRY(pair_t) *
findInLinkedList(const void * key, int len,
struct LH_ENTRY(pair_t)* list,
struct LH_ENTRY(pair_t)** predecessor) {
int iters = 1;
*predecessor = 0;
while(list) {
if(len == list->keyLength && !memcmp(key, list->key, len)) {
while(list) {
if(len == list->keyLength && !memcmp(key, list->key, len)) {
#ifdef MEASURE_GLOBAL_BUCKET_LENGTH
pthread_mutex_lock(&stat_mutex);
totalIters += iters;
@ -82,13 +80,13 @@ findInLinkedList(const void * key, int len,
}
static LH_ENTRY(value_t) * removeFromLinkedList(struct LH_ENTRY(table) * table,
intptr_t bucket,
intptr_t bucket,
const LH_ENTRY(key_t)* key, int len){
struct LH_ENTRY(pair_t) * predecessor;
struct LH_ENTRY(pair_t) * thePair;
LH_ENTRY(value_t) * ret;
thePair = findInLinkedList(key, len,
&(table->bucketList[bucket]),
thePair = findInLinkedList(key, len,
&(table->bucketList[bucket]),
&predecessor);
if(!thePair) { // Not found; return null.
ret = 0;
@ -96,13 +94,13 @@ static LH_ENTRY(value_t) * removeFromLinkedList(struct LH_ENTRY(table) * table,
assert(thePair == &(table->bucketList[bucket]));
free((void*)thePair->key);
if(!thePair->next) {
if(!thePair->next) {
// End of list; need to copy next into bucketlist, and free it.
thePair->key = 0;
thePair->keyLength = 0;
ret = thePair->value;
thePair->value = 0;
} else {
} else {
// Freeing item in table->bucketList. Copy its next pair to
// bucketList, and free that item.
ret = thePair->value;
@ -119,12 +117,12 @@ static LH_ENTRY(value_t) * removeFromLinkedList(struct LH_ENTRY(table) * table,
return ret;
}
static struct LH_ENTRY(pair_t)* insertIntoLinkedList(struct LH_ENTRY(table) * table,
int bucket,
int bucket,
const LH_ENTRY(key_t) * key, int len,
LH_ENTRY(value_t) * value){
struct LH_ENTRY(pair_t) *thePair;
if(table->bucketList[bucket].key == 0) {
// The bucket's empty
// The bucket's empty
// Sanity checks...
assert(table->bucketList[bucket].keyLength == 0);
assert(table->bucketList[bucket].value == 0);
@ -134,9 +132,9 @@ static struct LH_ENTRY(pair_t)* insertIntoLinkedList(struct LH_ENTRY(table) * t
thePair->key = malloc(len);
thePair->keyLength = len;
memcpy(((void*)thePair->key), key, len);
thePair->value = value;
} else {
// the bucket isn't empty.
thePair->value = value;
} else {
// the bucket isn't empty.
thePair = malloc(sizeof(struct LH_ENTRY(pair_t)));
thePair->key = malloc(len);
memcpy((void*)thePair->key, key, len);
@ -147,26 +145,26 @@ static struct LH_ENTRY(pair_t)* insertIntoLinkedList(struct LH_ENTRY(table) * t
}
return thePair;
}
static void extendHashTable(struct LH_ENTRY(table) * table) {
static void extendHashTable(struct LH_ENTRY(table) * table) {
unsigned int maxExtension = stasis_util_two_to_the(table->bucketListBits-1);
// If table->bucketListNextExtension == maxExtension, then newBucket =
// twoToThe(table->bucketListBits), which is one higher than the hash can
// return.
if(table->bucketListNextExtension < maxExtension) {
if(table->bucketListNextExtension < maxExtension) {
table->bucketListNextExtension++;
} else {
table->bucketListNextExtension = 1;
table->bucketListBits ++;
maxExtension = stasis_util_two_to_the(table->bucketListBits-1);
}
unsigned int splitBucket = table->bucketListNextExtension - 1;
unsigned int newBucket = table->bucketListNextExtension - 1 + maxExtension;
// Assumes realloc is reasonably fast... This seems to be a good
// assumption under linux.
table->bucketList = realloc(table->bucketList,
table->bucketList = realloc(table->bucketList,
(1+newBucket) * sizeof(struct LH_ENTRY(pair_t)));
table->bucketListLength = 1+newBucket;
table->bucketList[newBucket].key = 0;
@ -177,27 +175,27 @@ static void extendHashTable(struct LH_ENTRY(table) * table) {
// Now, table->nextExtension, table->tableBits are correct, so we
// can call hash.
struct LH_ENTRY(pair_t) * splitBucketRoot =
struct LH_ENTRY(pair_t) * splitBucketRoot =
&(table->bucketList[splitBucket]);
while(splitBucketRoot->key &&
(stasis_linear_hash(splitBucketRoot->key, splitBucketRoot->keyLength,
(stasis_linear_hash(splitBucketRoot->key, splitBucketRoot->keyLength,
table->bucketListBits, table->bucketListNextExtension) ==
newBucket)) {
insertIntoLinkedList(table, newBucket,
splitBucketRoot->key, splitBucketRoot->keyLength,
insertIntoLinkedList(table, newBucket,
splitBucketRoot->key, splitBucketRoot->keyLength,
splitBucketRoot->value);
removeFromLinkedList(table, splitBucket,
removeFromLinkedList(table, splitBucket,
splitBucketRoot->key, splitBucketRoot->keyLength);
}
if(splitBucketRoot->key) {
assert(stasis_linear_hash(splitBucketRoot->key, splitBucketRoot->keyLength,
table->bucketListBits, table->bucketListNextExtension)
table->bucketListBits, table->bucketListNextExtension)
== splitBucket);
} else {
} else {
assert(!splitBucketRoot->next);
}
struct LH_ENTRY(pair_t) * next = splitBucketRoot->next;
while(next) {
while(next) {
// We know that next isn't the bucketList root, so removing it from
// the list doesn't change its successor.
struct LH_ENTRY(pair_t) * newNext = next->next;
@ -224,7 +222,7 @@ static void extendHashTable(struct LH_ENTRY(table) * table) {
struct LH_ENTRY(table) * LH_ENTRY(create)(int initialSize) {
struct LH_ENTRY(table) * ret = malloc(sizeof(struct LH_ENTRY(table)));
ret->bucketList = calloc(initialSize, sizeof(struct LH_ENTRY(pair_t)));
stasis_linear_hash_get_size_params(initialSize,
stasis_linear_hash_get_size_params(initialSize,
&(ret->bucketListBits),
&(ret->bucketListNextExtension));
ret->bucketListLength = initialSize;
@ -237,23 +235,23 @@ struct LH_ENTRY(table) * LH_ENTRY(create)(int initialSize) {
LH_ENTRY(value_t) * LH_ENTRY(insert) (struct LH_ENTRY(table) * table,
const LH_ENTRY(key_t) * key, int len,
LH_ENTRY(value_t) * value) {
LH_ENTRY(value_t) * value) {
#ifdef NAIVE_LOCKING
pthread_mutex_lock(&(table->lock));
#endif
intptr_t bucket = stasis_linear_hash(key, len,
intptr_t bucket = stasis_linear_hash(key, len,
table->bucketListBits, table->bucketListNextExtension);
struct LH_ENTRY(pair_t) * thePair = 0;
struct LH_ENTRY(pair_t) * junk;
LH_ENTRY(value_t) * ret;
if((thePair = findInLinkedList(key, len, &(table->bucketList[bucket]),
&junk))) { // , &iters))) {
if((thePair = findInLinkedList(key, len, &(table->bucketList[bucket]),
&junk))) { // , &iters))) {
// In this bucket.
ret = thePair->value;
thePair->value = value;
// Don't need to update occupancy.
} else {
} else {
// Not in this bucket
thePair = insertIntoLinkedList(table, bucket, key, len, value);
ret = 0;
@ -267,8 +265,8 @@ LH_ENTRY(value_t) * LH_ENTRY(insert) (struct LH_ENTRY(table) * table,
assert(!memcmp(thePair->key, key, len));
struct LH_ENTRY(pair_t) * pairInBucket = 0;
// Is thePair in the bucket?
pairInBucket = findInLinkedList(key, len,
&(table->bucketList[bucket]),
pairInBucket = findInLinkedList(key, len,
&(table->bucketList[bucket]),
&junk);
assert(pairInBucket);
assert(pairInBucket == thePair);
@ -276,9 +274,9 @@ LH_ENTRY(value_t) * LH_ENTRY(insert) (struct LH_ENTRY(table) * table,
assert(!findInLinkedList(key, len, pairInBucket->next, &junk));
} */
if(FILL_FACTOR < ( ((double)table->occupancy) /
if(FILL_FACTOR < ( ((double)table->occupancy) /
((double)table->bucketListLength)
)) {
)) {
extendHashTable(table);
}
#ifdef NAIVE_LOCKING
@ -289,7 +287,7 @@ LH_ENTRY(value_t) * LH_ENTRY(insert) (struct LH_ENTRY(table) * table,
}
LH_ENTRY(value_t) * LH_ENTRY(remove) (struct LH_ENTRY(table) * table,
const LH_ENTRY(key_t) * key, int len) {
const LH_ENTRY(key_t) * key, int len) {
#ifdef NAIVE_LOCKING
pthread_mutex_lock(&(table->lock));
#endif
@ -306,7 +304,7 @@ LH_ENTRY(value_t) * LH_ENTRY(remove) (struct LH_ENTRY(table) * table,
}
LH_ENTRY(value_t) * LH_ENTRY(find)(struct LH_ENTRY(table) * table,
const LH_ENTRY(key_t) * key, int len) {
const LH_ENTRY(key_t) * key, int len) {
#ifdef NAIVE_LOCKING
pthread_mutex_lock(&(table->lock));
#endif
@ -316,23 +314,23 @@ LH_ENTRY(value_t) * LH_ENTRY(find)(struct LH_ENTRY(table) * table,
struct LH_ENTRY(pair_t) * predecessor;
struct LH_ENTRY(pair_t) * thePair;
// int iters;
thePair = findInLinkedList(key, len,
&(table->bucketList[bucket]),
thePair = findInLinkedList(key, len,
&(table->bucketList[bucket]),
&predecessor);
#ifdef NAIVE_LOCKING
pthread_mutex_unlock(&(table->lock));
#endif
if(!thePair) {
if(!thePair) {
return 0;
} else {
return thePair->value;
}
}
void LH_ENTRY(openlist)(const struct LH_ENTRY(table) * table,
struct LH_ENTRY(list) * list) {
void LH_ENTRY(openlist)(const struct LH_ENTRY(table) * table,
struct LH_ENTRY(list) * list) {
#ifdef NAIVE_LOCKING
pthread_mutex_lock(&(((struct LH_ENTRY(table)*)table)->lock));
#endif
@ -346,14 +344,14 @@ void LH_ENTRY(openlist)(const struct LH_ENTRY(table) * table,
}
const struct LH_ENTRY(pair_t)* LH_ENTRY(readlist)(struct LH_ENTRY(list) * list) {
const struct LH_ENTRY(pair_t)* LH_ENTRY(readlist)(struct LH_ENTRY(list) * list) {
#ifdef NAIVE_LOCKING
pthread_mutex_lock(&(((struct LH_ENTRY(table)*)(list->table))->lock));
#endif
assert(list->currentBucket != -2);
while(!list->nextPair) {
list->currentBucket++;
if(list->currentBucket == list->table->bucketListLength) {
if(list->currentBucket == list->table->bucketListLength) {
break;
}
if(list->table->bucketList[list->currentBucket].key) {
@ -361,7 +359,7 @@ const struct LH_ENTRY(pair_t)* LH_ENTRY(readlist)(struct LH_ENTRY(list) * list)
}
}
list->currentPair = list->nextPair;
if(list->currentPair) {
if(list->currentPair) {
list->nextPair = list->currentPair->next;
}
// XXX is it even meaningful to return a pair object on an unlocked hashtable?
@ -386,9 +384,9 @@ void LH_ENTRY(closelist)(struct LH_ENTRY(list) * list) {
void LH_ENTRY(destroy) (struct LH_ENTRY(table) * t) {
struct LH_ENTRY(list) l;
const struct LH_ENTRY(pair_t) * p;
LH_ENTRY(openlist)(t, &l);
while((p = LH_ENTRY(readlist)(&l))) {
while((p = LH_ENTRY(readlist)(&l))) {
LH_ENTRY(remove)(t, p->key, p->keyLength);
// We always remove the head of the list, which breaks
// the iterator. Reset the iterator to the beginning of the bucket.
@ -404,7 +402,7 @@ void LH_ENTRY(destroy) (struct LH_ENTRY(table) * t) {
free(t);
}
void LH_ENTRY(stats)(){
void LH_ENTRY(stats)(){
#ifdef MEASURE_GLOBAL_BUCKET_LENGTH
pthread_mutex_lock(&stat_mutex);
@ -420,7 +418,7 @@ void LH_ENTRY(stats)(){
pblHashTable_t * pblHtCreate( ) {
// return (pblHashTable_t*)LH_ENTRY(create)(2048);
return (pblHashTable_t*)LH_ENTRY(create)(16);
return (pblHashTable_t*)LH_ENTRY(create)(16);
}
int pblHtDelete ( pblHashTable_t * h ) {
LH_ENTRY(destroy)((struct LH_ENTRY(table)*)h);
@ -436,12 +434,12 @@ int pblHtInsert ( pblHashTable_t * h, const void * key, size_t keylen,
// 0 -> inserted successfully
if(LH_ENTRY(find)((struct LH_ENTRY(table)*)h, key, keylen)) {
if(firstPBLinsert) {
if(firstPBLinsert) {
fprintf(stderr, "lhtable.c: This code relies on PBL insert semantics...\n");
firstPBLinsert = 0;
}
return -1;
} else {
} else {
LH_ENTRY(insert)((struct LH_ENTRY(table)*)h, key, keylen, dataptr);
return 0;
}
@ -452,11 +450,11 @@ int pblHtRemove ( pblHashTable_t * h, const void * key, size_t keylen ) {
//-1 => not found (or error)
if(LH_ENTRY(remove)((struct LH_ENTRY(table)*)h, key, keylen)) {
return 0;
} else {
} else {
return -1;
}
}
void * pblHtLookup ( pblHashTable_t * h, const void * key, size_t keylen ) {
void * pblHtLookup ( pblHashTable_t * h, const void * key, size_t keylen ) {
// return values:
// 0 -> not found (or error)
return LH_ENTRY(find)((struct LH_ENTRY(table) *) h, key, keylen);
@ -471,16 +469,16 @@ void * pblHtFirst ( pblHashTable_t * h ) {
struct LH_ENTRY(list) *list = malloc(sizeof(struct LH_ENTRY(list)));
struct LH_ENTRY(list) * oldList;
if((oldList = LH_ENTRY(insert)(pblLists,
&h, sizeof(pblHashTable_t*),
list))) {
if((oldList = LH_ENTRY(insert)(pblLists,
&h, sizeof(pblHashTable_t*),
list))) {
LH_ENTRY(closelist)(oldList);
free(oldList);
}
LH_ENTRY(openlist)((struct LH_ENTRY(table)*)h,
}
LH_ENTRY(openlist)((struct LH_ENTRY(table)*)h,
list);
const struct LH_ENTRY(pair_t) * p = LH_ENTRY(readlist)(list);
if(p) {
if(p) {
return p->value;
} else {
oldList = LH_ENTRY(remove)(pblLists, &h, sizeof(pblHashTable_t*));
@ -489,33 +487,33 @@ void * pblHtFirst ( pblHashTable_t * h ) {
}
}
void * pblHtNext ( pblHashTable_t * h ) {
struct LH_ENTRY(list) *list = LH_ENTRY(find)(pblLists,
struct LH_ENTRY(list) *list = LH_ENTRY(find)(pblLists,
&h, sizeof(pblHashTable_t*));
assert(list);
const struct LH_ENTRY(pair_t) * p = LH_ENTRY(readlist)(list);
if(p) {
if(p) {
return p->value;
} else {
struct LH_ENTRY(list)* oldList =
struct LH_ENTRY(list)* oldList =
LH_ENTRY(remove)(pblLists, &h, sizeof(pblHashTable_t*));
free(oldList);
return 0;
}
}
void * pblHtCurrent ( pblHashTable_t * h ) {
struct LH_ENTRY(list) *list = LH_ENTRY(find)(pblLists,
struct LH_ENTRY(list) *list = LH_ENTRY(find)(pblLists,
&h, sizeof(pblHashTable_t*));
if(list && list->currentPair)
if(list && list->currentPair)
return list->currentPair->value;
else
else
return 0;
}
void * pblHtCurrentKey ( pblHashTable_t * h ) {
struct LH_ENTRY(list) *list = LH_ENTRY(find)(pblLists,
struct LH_ENTRY(list) *list = LH_ENTRY(find)(pblLists,
&h, sizeof(pblHashTable_t*));
if(list && list->currentPair)
return (void*)list->currentPair->key;
else
else
return 0;
}

View file

@ -3,7 +3,7 @@ This software is copyrighted by the Regents of the University of
California, and other parties. The following terms apply to all files
associated with the software unless explicitly disclaimed in
individual files.
The authors hereby grant permission to use, copy, modify, distribute,
and license this software and its documentation for any purpose,
provided that existing copyright notices are retained in all copies
@ -13,20 +13,20 @@ authorized uses. Modifications to this software may be copyrighted by
their authors and need not follow the licensing terms described here,
provided that the new terms are clearly indicated on the first page of
each file where they apply.
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
GOVERNMENT USE: If you are acquiring this software on behalf of the
U.S. government, the Government shall have only "Restricted Rights" in
the software and related documentation as defined in the Federal
@ -44,13 +44,8 @@ terms specified in this license.
*
* simple linked list
*****************************/
#include <config.h>
#include <stasis/common.h>
#include <stasis/linkedlist.h>
#include <stdio.h>
void printList(LinkedList **l) {
LinkedList * tmp = *l;
printf ("List is ");

View file

@ -1,21 +1,19 @@
#include <pbl/pbl.h>
#include <stasis/lockManager.h>
#include <stasis/compensations.h>
#include <stasis/latches.h>
#include <stasis/hash.h>
#include <sys/time.h>
#include <time.h>
#include <stasis/latches.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <assert.h>
#include <string.h>
#include <stasis/hash.h>
#define MUTEX_COUNT 32
// These next two correspond to MUTEX count, and are the appropriate values to pass into hash().
#define MUTEX_BITS 5
#define MUTEX_EXT 32
#define MUTEX_EXT 32
static pthread_mutex_t mutexes[MUTEX_COUNT];
@ -43,7 +41,7 @@ void lockManagerInitHashed() {
}
xidLockTable = pblHtCreate();
ridLockTable = pblHtCreate();
}
pblHashTable_t * lockManagerBeginTransactionUnlocked(int xid) {
pblHashTable_t * xidLocks = pblHtCreate();
@ -69,7 +67,7 @@ lock* createLock(byte * dat, int datLen) {
ret->readers = 0;
ret->writers = 0;
ret->waiting = 0;
pblHtInsert(ridLockTable, dat, datLen, ret);
return ret;
}
@ -113,7 +111,7 @@ int lockManagerReadLockHashed(int xid, byte * dat, int datLen) {
struct timeval tv;
int tod_ret = gettimeofday (&tv, NULL);
tv.tv_sec++; // Wait up to one second to obtain a lock before detecting deadlock.
struct timespec ts;
struct timespec ts;
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
if(tod_ret != 0) {
@ -129,7 +127,7 @@ int lockManagerReadLockHashed(int xid, byte * dat, int datLen) {
return LLADD_DEADLOCK;
}
} while(ridLock->writers);
}
}
if(currentLockLevel < LM_READLOCK) {
ridLock->readers++;
pblHtRemove(xidLocks, dat, datLen);
@ -176,7 +174,7 @@ int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) {
struct timeval tv;
int tod_ret = gettimeofday(&tv, NULL);
tv.tv_sec++;
struct timespec ts;
struct timespec ts;
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = tv.tv_usec * 1000;
if(tod_ret != 0) {
@ -184,7 +182,7 @@ int lockManagerWriteLockHashed(int xid, byte * dat, int datLen) {
compensation_set_error(LLADD_INTERNAL_ERROR);
return LLADD_INTERNAL_ERROR;
}
while(ridLock->writers || (ridLock->readers - me)) {
while(ridLock->writers || (ridLock->readers - me)) {
int lockret = pthread_cond_timedwait(&ridLock->writeOK, mut, &ts);
if(lockret == ETIMEDOUT) {
ridLock->waiting--;
@ -288,7 +286,7 @@ int lockManagerCommitHashed(int xid, int datLen) {
int tmpret = decrementLock(currentKey, datLen, currentLevel);
// Pass any error(s) up to the user.
// (This logic relies on the fact that currently it only returns 0 and LLADD_INTERNAL_ERROR)
if(tmpret) {
if(tmpret) {
ret = tmpret;
}
}

View file

@ -1,21 +1,15 @@
#define _SVID_SOURCE
#define _BSD_SOURCE
#include <dirent.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <stasis/common.h>
#include <stasis/latches.h>
#include <stasis/logger/filePool.h>
#include <stdio.h>
/**
@see stasis_log_safe_writes_state for more documentation;
identically named fields serve analagous purposes.

View file

@ -39,10 +39,6 @@ authors grant the U.S. Government and others acting in its behalf
permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
#include <stdlib.h>
#include <stasis/common.h>
#include <stasis/logger/logHandle.h>
struct LogHandle {

View file

@ -2,13 +2,11 @@
NOTE: Person who's using the consumer interface calls close first, (for now).
*/
#include <stdlib.h>
#include <assert.h>
#include <stasis/logger/logMemory.h>
#include <errno.h>
#include <stasis/compensations.h>
#include <assert.h>
typedef struct {
pthread_mutex_t mutex;
pthread_mutex_t readerMutex;
@ -28,12 +26,12 @@ void logMemory_init() {
}
lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset) {
lladdFifo_t * fifo = (lladdFifo_t *) malloc(sizeof(lladdFifo_t));
lladdIterator_t * iterator = (lladdIterator_t *) malloc(sizeof(lladdIterator_t));
iterator->type = LOG_MEMORY_ITERATOR;
iterator->impl = malloc(sizeof(logMemory_fifo_t));
iterator->impl = malloc(sizeof(logMemory_fifo_t));
((logMemory_fifo_t *)iterator->impl)->ringBuffer = openLogRingBuffer(size, initialOffset);
pthread_mutex_init(&(((logMemory_fifo_t *)iterator->impl)->mutex), NULL);
pthread_mutex_init(&(((logMemory_fifo_t *)iterator->impl)->readerMutex), NULL);
@ -41,17 +39,17 @@ lladdFifo_t * logMemoryFifo(size_t size, lsn_t initialOffset) {
pthread_cond_init (&(((logMemory_fifo_t *)iterator->impl)->writeReady), NULL);
((logMemory_fifo_t *)iterator->impl)->cached_value = NULL;
((logMemory_fifo_t *)iterator->impl)->eof = -1;
lladdConsumer_t * consumer = (lladdConsumer_t *) malloc(sizeof(lladdConsumer_t));
consumer->type = LOG_MEMORY_CONSUMER;
consumer->impl = iterator->impl;
consumer->impl = iterator->impl;
fifo->iterator = iterator;
fifo->consumer = consumer;
fifo->consumer = consumer;
return fifo;
}
}
@ -73,7 +71,7 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
pthread_mutex_lock(&(fifo->readerMutex));
pthread_mutex_lock(&(fifo->mutex));
size_t size;
lsn_t lsn;
lsn_t lsn;
int ret;
if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) {
@ -91,8 +89,8 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
pthread_mutex_unlock(&(fifo->readerMutex));
return 0;
}
}
if (ret == -1) {
}
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
@ -100,10 +98,10 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
}
assert(!ret);
byte * tmp;
tmp = realloc(fifo->cached_value, size);
tmp = realloc(fifo->cached_value, size);
if(tmp == NULL) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
@ -113,18 +111,18 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
fifo->cached_value = tmp;
fifo->cached_value_size = size;
while(-2 == (lsn = ringBufferTruncateRead( fifo->cached_value, fifo->ringBuffer, size))) {
pthread_cond_wait(&(fifo->readReady), &(fifo->mutex));
}
if (ret == -1) {
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
fifo->cached_lsn = (lsn_t)lsn;
@ -132,7 +130,7 @@ compensated_function int logMemory_Iterator_next (int xid, void * impl) {
pthread_cond_broadcast(&(fifo->writeReady));
pthread_mutex_unlock(&(fifo->mutex));
return 1;
}
/** @todo logMemory_Iterator_tryNext is a cut and pasted version of
@ -146,25 +144,25 @@ compensated_function int logMemory_Iterator_tryNext (int xid, void * impl) {
}
pthread_mutex_lock(&(fifo->mutex));
size_t size;
lsn_t lsn;
lsn_t lsn;
int ret;
if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) {
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return 0;
}
// TODO Check to see if we're done reading...
//From here on, we need to continue as normal since we consumed data from the ringbuffer...
//From here on, we need to continue as normal since we consumed data from the ringbuffer...
if(-2 == (ret = ringBufferTruncateRead((byte *)&size, fifo->ringBuffer, sizeof(size_t)))) {
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return 0;
}
if (ret == -1) {
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
@ -172,10 +170,10 @@ compensated_function int logMemory_Iterator_tryNext (int xid, void * impl) {
}
assert(!ret);
byte * tmp;
tmp = realloc(fifo->cached_value, size);
tmp = realloc(fifo->cached_value, size);
if(tmp == NULL) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
@ -189,14 +187,14 @@ compensated_function int logMemory_Iterator_tryNext (int xid, void * impl) {
while(-2 == (lsn = ringBufferTruncateRead( fifo->cached_value, fifo->ringBuffer, size))) {
pthread_cond_wait(&(fifo->readReady), &(fifo->mutex));
}
if (ret == -1) {
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
fifo->cached_lsn = (lsn_t)lsn;
@ -204,7 +202,7 @@ compensated_function int logMemory_Iterator_tryNext (int xid, void * impl) {
pthread_cond_broadcast(&(fifo->writeReady));
pthread_mutex_unlock(&(fifo->mutex));
return 1;
}
compensated_function void logMemory_Iterator_releaseLock (int xid, void * impl) {
@ -234,18 +232,18 @@ compensated_function int logMemory_Iterator_nextOrEmpty (int xid, void * impl) {
pthread_mutex_lock(&(fifo->readerMutex));
pthread_mutex_lock(&(fifo->mutex));
size_t size;
lsn_t lsn;
lsn_t lsn;
int ret;
if(fifo->eof != -1 && fifo->eof == ringBufferReadPosition(fifo->ringBuffer)) {
/* pthread_mutex_unlock(&(fifo->mutex));
/* pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex)); */
return 0;
}
// TODO Check to see if we're done reading...
//From here on, we need to continue as normal since we consumed data from the ringbuffer...
//From here on, we need to continue as normal since we consumed data from the ringbuffer...
if(-2 == (ret = ringBufferTruncateRead((byte *)&size, fifo->ringBuffer, sizeof(size_t)))) {
/* pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex)); */
@ -256,7 +254,7 @@ compensated_function int logMemory_Iterator_nextOrEmpty (int xid, void * impl) {
return 0;
}
if (ret == -1) {
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
@ -264,10 +262,10 @@ compensated_function int logMemory_Iterator_nextOrEmpty (int xid, void * impl) {
}
assert(!ret);
byte * tmp;
tmp = realloc(fifo->cached_value, size);
tmp = realloc(fifo->cached_value, size);
if(tmp == NULL) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
@ -281,14 +279,14 @@ compensated_function int logMemory_Iterator_nextOrEmpty (int xid, void * impl) {
while(-2 == (lsn = ringBufferTruncateRead( fifo->cached_value, fifo->ringBuffer, size))) {
pthread_cond_wait(&(fifo->readReady), &(fifo->mutex));
}
if (ret == -1) {
if (ret == -1) {
compensation_set_error(LLADD_INTERNAL_ERROR);
pthread_mutex_unlock(&(fifo->mutex));
pthread_mutex_unlock(&(fifo->readerMutex));
return LLADD_INTERNAL_ERROR;
}
assert(!ret);
fifo->cached_lsn = (lsn_t)lsn;
@ -296,7 +294,7 @@ compensated_function int logMemory_Iterator_nextOrEmpty (int xid, void * impl) {
pthread_cond_broadcast(&(fifo->writeReady));
pthread_mutex_unlock(&(fifo->mutex));
return 1;
}
/* return the lsn */
@ -325,7 +323,7 @@ void logMemory_consumer_close(int xid, void *it){
pthread_mutex_lock(&(fifo->mutex));
fifo->eof = ringBufferAppendPosition(fifo->ringBuffer);
assert(fifo->eof != -1);
pthread_cond_broadcast(&(fifo->readReady)); // There may have been threads waiting on the next tuple before close was called.
pthread_cond_broadcast(&(fifo->readReady)); // There may have been threads waiting on the next tuple before close was called.
pthread_mutex_unlock(&(fifo->mutex));
}
@ -338,8 +336,8 @@ int logMemory_consumer_push(int xid, /*lladdConsumer_t * cons*/ void * it, byte
logMemory_fifo_t * fifo = (logMemory_fifo_t *) (it);
pthread_mutex_lock(&(fifo->mutex));
while(-2 == (ret = ringBufferAppend(fifo->ringBuffer,
(byte *)&valSize,
while(-2 == (ret = ringBufferAppend(fifo->ringBuffer,
(byte *)&valSize,
sizeof(size_t) ))) {
pthread_cond_wait(&(fifo->writeReady), &(fifo->mutex));
}
@ -355,7 +353,7 @@ int logMemory_consumer_push(int xid, /*lladdConsumer_t * cons*/ void * it, byte
compensation_set_error(LLADD_INTERNAL_ERROR);
return LLADD_INTERNAL_ERROR;
}
pthread_cond_broadcast(&(fifo->readReady));
pthread_mutex_unlock(&(fifo->mutex));

View file

@ -1,7 +1,5 @@
#include <stdio.h>
#include <stasis/logger/logWriterUtils.h>
#include <stdlib.h>
/** @file
/** @file
This file contains old-ish utility methods that wrap fseek, read, etc...

View file

@ -1,6 +1,7 @@
#include <stasis/transactional.h>
#include <stasis/logger/reorderingHandle.h>
#include <string.h>
#include <stdio.h>
long stasis_log_reordering_usleep_after_flush = 0;

View file

@ -41,15 +41,6 @@ terms specified in this license.
---*/
#include <sys/types.h>
#include <sys/stat.h>
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <assert.h>
#include <config.h>
#include <stasis/common.h>
@ -60,6 +51,10 @@ terms specified in this license.
#include <stasis/logger/logWriterUtils.h>
#include <stasis/logger/logHandle.h>
#include <assert.h>
#include <stdio.h>
#include <fcntl.h>
/**
* @file
*

View file

@ -1,19 +1,18 @@
#include <stasis/multiplexer.h>
#include <stasis/crc32.h>
#include <stdlib.h>
#include <stasis/operations/linearHashNTA.h>
#include <stasis/logger/logMemory.h>
lladdMultiplexer_t * lladdMultiplexer_alloc(int xid, lladdIterator_t * it,
lladdMultiplexer_t * lladdMultiplexer_alloc(int xid, lladdIterator_t * it,
void (*multiplexer)(byte * key,
size_t keySize,
byte * value,
size_t valueSize,
size_t keySize,
byte * value,
size_t valueSize,
byte ** multiplexKey,
size_t * multiplexKeySize),
/* lladdConsumer_t * getConsumer(struct lladdFifoPool_t* fifoPool,
byte* multiplexKey,
byte* multiplexKey,
size_t multiplexKeySize), */
lladdFifoPool_t * fifoPool) {
lladdMultiplexer_t * ret = malloc(sizeof(lladdMultiplexer_t));
@ -37,14 +36,14 @@ int lladdMultiplexer_join(lladdMultiplexer_t * multiplexer) {
return pthread_join(multiplexer->worker,NULL);
}
void * lladdMultiplexer_flush(lladdMultiplexer_t * m) {
void * lladdMultiplexer_flush(lladdMultiplexer_t * m) {
// lladdMultiplexer_t * m = arg;
lladdConsumer_t * consumer;
while(Titerator_tryNext(m->xid, m->it)) {
byte * mkey, * key, * value;
size_t mkeySize, keySize, valueSize;
keySize = Titerator_key (m->xid, m->it, &key);
valueSize = Titerator_value(m->xid, m->it, &value);
@ -56,12 +55,12 @@ void * lladdMultiplexer_flush(lladdMultiplexer_t * m) {
Titerator_tupleDone(m->xid, m->it);
lladdFifoPool_markDirty(m->xid, m->fifoPool, fifo);
}
// iterate over pblhash, closing consumers.
/* Titerator_close(m->xid, m->it);
// @todo Does this belong in its own function in fifo.c?
// @todo Does this belong in its own function in fifo.c?
lladdFifoPool_t * pool = m->fifoPool;
int i;
@ -77,14 +76,14 @@ void * lladdMultiplexer_flush(lladdMultiplexer_t * m) {
}
void * multiplexer_worker(void * arg) {
void * multiplexer_worker(void * arg) {
lladdMultiplexer_t * m = arg;
lladdConsumer_t * consumer;
while(Titerator_next(m->xid, m->it)) {
byte * mkey, * key, * value;
size_t mkeySize, keySize, valueSize;
keySize = Titerator_key (m->xid, m->it, &key);
valueSize = Titerator_value(m->xid, m->it, &value);
@ -96,7 +95,7 @@ void * multiplexer_worker(void * arg) {
Titerator_tupleDone(m->xid, m->it);
lladdFifoPool_markDirty(m->xid, m->fifoPool, fifo);
}
// iterate over pblhash, closing consumers.
Titerator_close(m->xid, m->it);
@ -122,9 +121,9 @@ void * multiplexer_worker(void * arg) {
*/
void multiplexHashLogByKey(byte * key,
size_t keySize,
byte * value,
size_t valueSize,
size_t keySize,
byte * value,
size_t valueSize,
byte ** multiplexKey,
size_t * multiplexKeySize) {
// We don't care what the key is. It's probably an LSN.
@ -149,7 +148,7 @@ void multiplexHashLogByKey(byte * key,
break;
case OPERATION_LINEAR_HASH_REMOVE:
{
linearHash_insert_arg * arg = (linearHash_insert_arg*)updateArgs; // this *is* correct. Don't ask....
linearHash_insert_arg * arg = (linearHash_insert_arg*)updateArgs; // this *is* correct. Don't ask....
*multiplexKey = (byte*) (arg + 1);
*multiplexKeySize = arg->keySize;
}

View file

@ -1,41 +1,39 @@
#define __USE_GNU
#define __USE_GNU
#define _GNU_SOURCE
#include <stasis/latches.h>
#include <stasis/transactional.h>
#include <stasis/hash.h>
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <string.h>
/*#ifndef PTHREAD_MUTEX_RECURSIVE
#define PTHREAD_MUTEX_RECURSIVE PTHREAD_MUTEX_RECURSIVE_NP
#endif*/
/** A quick note on the format of linked lists. Each entry consists
/** A quick note on the format of linked lists. Each entry consists
of a struct with some variable length data appended to it.
To access an entry's contents:
stasis_linkedList_entry * entry;
stasis_linkedList_entry * entry;
...
if(entry->size) {
key = (byte*)(entry + 1);
value = ((byte*)(entry+1)) + keySize;
} else {
entry->size must be nonzero if the entry is defined. It will be
zero if the entry is uniniailized (this can be the case if the
list has not yet been initialized. The end of the list is marked
entry->size must be nonzero if the entry is defined. It will be
zero if the entry is uniniailized (this can be the case if the
list has not yet been initialized. The end of the list is marked
by a next field with size -1.
}
To get the successor in the list:
stasis_linkedList_entry next = entry->next;
@file
*/
@ -55,7 +53,7 @@ compensated_function static void stasis_linked_list_insert_helper(int xid, recor
compensated_function static int stasis_linked_list_remove_helper(int xid, recordid list, const byte * key, int keySize);
typedef struct {
recordid list;
int keySize;
int keySize;
} stasis_linked_list_insert_log;
typedef struct {
recordid list;
@ -66,11 +64,11 @@ typedef struct {
compensated_function static int op_linked_list_nta_insert(const LogEntry* e, Page* p) {
assert(!p);
stasis_linked_list_remove_log * log = (stasis_linked_list_remove_log*)getUpdateArgs(e);;
byte * key;
byte * value;
int keySize, valueSize;
keySize = log->keySize;
valueSize = log->valueSize;
key = (byte*)(log+1);
@ -83,16 +81,16 @@ compensated_function static int op_linked_list_nta_insert(const LogEntry* e, Pag
stasis_linked_list_insert_helper(e->xid, log->list, key, keySize, value, valueSize);
} compensate_ret(compensation_error());
// pthread_mutex_unlock(&linked_list_mutex);
return 0;
}
compensated_function static int op_linked_list_nta_remove(const LogEntry *e, Page* p) {
assert(!p);
stasis_linked_list_remove_log * log = (stasis_linked_list_remove_log*)getUpdateArgs(e);
byte * key;
int keySize;
keySize = log->keySize;
key = (byte*)(log+1);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
@ -110,41 +108,41 @@ compensated_function int TlinkedListInsert(int xid, recordid list, const byte *
/* try_ret(compensation_error()) {
ret = TlinkedListRemove(xid, list, key, keySize);
} end_ret(compensation_error()); */
stasis_linked_list_insert_log * undoLog = malloc(sizeof(stasis_linked_list_insert_log) + keySize);
undoLog->list = list;
undoLog->keySize = keySize;
memcpy(undoLog+1, key, keySize);
pthread_mutex_lock(&stasis_linked_list_mutex);
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_INSERT,
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_INSERT,
(byte*)undoLog, sizeof(stasis_linked_list_insert_log) + keySize);
free(undoLog);
stasis_linked_list_insert_helper(xid, list, key, keySize, value, valueSize);
TendNestedTopAction(xid, handle);
} compensate_ret(compensation_error());
// pthread_mutex_unlock(&linked_list_mutex);
return ret;
return ret;
}
stasis_operation_impl stasis_op_impl_linked_list_insert() {
stasis_operation_impl o = {
stasis_operation_impl o = {
OPERATION_LINKED_LIST_INSERT,
UNKNOWN_TYPE_PAGE,
OPERATION_NOOP,
OPERATION_NOOP,
OPERATION_LINKED_LIST_REMOVE,
&op_linked_list_nta_insert
};
return o;
}
stasis_operation_impl stasis_op_impl_linked_list_remove() {
stasis_operation_impl o = {
stasis_operation_impl o = {
OPERATION_LINKED_LIST_REMOVE,
UNKNOWN_TYPE_PAGE,
OPERATION_NOOP,
OPERATION_NOOP,
OPERATION_LINKED_LIST_INSERT,
&op_linked_list_nta_remove
};
@ -154,9 +152,9 @@ compensated_function static void stasis_linked_list_insert_helper(int xid, recor
//int ret = Tli nkedListRemove(xid, list, key, keySize);
try {
stasis_linkedList_entry * entry = malloc(sizeof(stasis_linkedList_entry) + keySize + valueSize);
Tread(xid, list, entry);
if(!entry->next.size) {
memcpy(entry+1, key, keySize);
@ -188,19 +186,19 @@ compensated_function int TlinkedListFind(int xid, recordid list, const byte * ke
pthread_mutex_lock(&stasis_linked_list_mutex);
Tread(xid, list, entry);
} end_action_ret(-2);
if(!entry->next.size) {
free(entry);
free(entry);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return -1; // empty list
return -1; // empty list
}
int done = 0;
int ret = -1;
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, -2) {
while(!done) {
if(!memcmp(entry + 1, key, keySize)) {
if(!memcmp(entry + 1, key, keySize)) {
// Bucket contains the entry of interest.
int valueSize = list.size - (sizeof(stasis_linkedList_entry) + keySize);
*value = malloc(valueSize);
@ -215,7 +213,7 @@ compensated_function int TlinkedListFind(int xid, recordid list, const byte * ke
done = 1;
}
}
free(entry);
free(entry);
} compensate_ret(-2);
return ret;
@ -241,25 +239,25 @@ compensated_function int TlinkedListRemove(int xid, recordid list, const byte *
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
int entrySize = sizeof(stasis_linked_list_remove_log) + keySize + valueSize;
stasis_linked_list_remove_log * undoLog = malloc(entrySize);
undoLog->list = list;
undoLog->keySize = keySize;
undoLog->valueSize = valueSize;
memcpy(undoLog+1, key, keySize);
memcpy(((byte*)(undoLog+1))+keySize, value, valueSize);
// printf("entry size %d sizeof(remove_log)%d keysize %d valuesize %d sizeof(rid) %d key %d value {%d %d %ld}\n",
// entrySize, sizeof(stasis_linked_list_remove_log), keySize, valueSize, sizeof(recordid), key, value->page, value->slot, value->size);
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_REMOVE,
void * handle = TbeginNestedTopAction(xid, OPERATION_LINKED_LIST_REMOVE,
(byte*)undoLog, entrySize);
free(value);
free(undoLog);
stasis_linked_list_remove_helper(xid, list, key, keySize);
TendNestedTopAction(xid, handle);
} compensate_ret(compensation_error());
return 1;
return 1;
}
compensated_function static int stasis_linked_list_remove_helper(int xid, recordid list, const byte * key, int keySize) {
@ -283,10 +281,10 @@ compensated_function static int stasis_linked_list_remove_helper(int xid, record
int ret = 0;
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
while(1) {
if(compensation_error()) { break; }
if(!memcmp(entry + 1, key, keySize)) {
if(!memcmp(entry + 1, key, keySize)) {
// Bucket contains the entry of interest.
if(listRoot) {
if(entry->next.size == -1) {
@ -325,7 +323,7 @@ compensated_function static int stasis_linked_list_remove_helper(int xid, record
break;
}
}
}
}
free(entry);
} compensate_ret(compensation_error());
@ -356,7 +354,7 @@ compensated_function int TlinkedListMove(int xid, recordid start_list, recordid
}
compensated_function recordid TlinkedListCreate(int xid, int keySize, int valueSize) {
recordid ret;
try_ret(NULLRID) {
try_ret(NULLRID) {
ret = Talloc(xid, sizeof(stasis_linkedList_entry) + keySize + valueSize);
byte * cleared = calloc(sizeof(stasis_linkedList_entry) + keySize + valueSize, sizeof(byte));
Tset(xid, ret, cleared);
@ -367,14 +365,14 @@ compensated_function recordid TlinkedListCreate(int xid, int keySize, int valueS
compensated_function void TlinkedListDelete(int xid, recordid list) {
try {
stasis_linkedList_entry * entry = malloc(list.size);
Tread(xid, list, entry);
Tdealloc(xid, list);
if(entry->next.size == 0) {
return;
}
while(entry->next.size != -1) {
if(compensation_error()) { break; }
recordid nextEntry;
@ -382,7 +380,7 @@ compensated_function void TlinkedListDelete(int xid, recordid list) {
assert(!memcmp(&nextEntry, &(entry->next), sizeof(recordid)));
Tdealloc(xid, nextEntry);
}
free(entry);
} end;
}
@ -410,7 +408,7 @@ compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * i
stasis_linkedList_entry * entry;
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
pthread_mutex_lock(&stasis_linked_list_mutex);
if(it->first == -1) {
it->first = 1;
} else if(it->first) {
@ -434,9 +432,9 @@ compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * i
}
} end_action_ret(compensation_error());
if(done) {
if(done) {
pthread_mutex_unlock(&stasis_linked_list_mutex);
return ret;
return ret;
}
begin_action_ret(pthread_mutex_unlock, &stasis_linked_list_mutex, compensation_error()) {
@ -451,19 +449,19 @@ compensated_function int TlinkedListNext(int xid, stasis_linkedList_iterator * i
*valueSize = it->valueSize;
*key = malloc(*keySize);
*value = malloc(*valueSize);
it->next = entry->next;
memcpy(*key, entry+1, *keySize);
memcpy(*value, ((byte*)(entry + 1))+*keySize, *valueSize);
ret = 1;
ret = 1;
} else {
// This entry was empty (this case occurs with empty lists)
ret = 0;
}
free(entry);
free(entry);
pthread_mutex_unlock(&stasis_linked_list_mutex);
return ret;
}

View file

@ -1,7 +1,9 @@
#include <stasis/operations.h>
#include <stasis/page.h>
#include <stasis/logger/reorderingHandle.h>
#include <string.h>
#include <stdio.h>
static int op_lsn_free_set(const LogEntry *e, Page *p) {
if(p->pageType != SLOTTED_LSN_FREE_PAGE) { abort() ; }
assert(e->update.arg_size >= (sizeof(pageoff_t) * 2));

View file

@ -1,7 +1,5 @@
#include <stasis/operations.h>
#include <stasis/hash.h>
#include <limits.h>
#include <assert.h>
#include <stasis/latches.h>
#include <stasis/page.h>
@ -20,10 +18,9 @@
#define headerHashBits (headerRidB->page)
#define headerNextSplit (headerRidB->slot)
#include <math.h>
#include <stdlib.h>
#include <string.h>
#include <pbl/pbl.h>
#include <assert.h>
/**
next.size == 0 implies empty bucket
next.size == -1 implies end of list

View file

@ -1,13 +1,8 @@
#define _XOPEN_SOURCE 600
#include <stdlib.h>
#include <config.h>
#include <stasis/page.h>
#include <stasis/logger/logEntry.h>
#include <stasis/operations/pageOperations.h>
#include <stasis/operations/regions.h>
#include <assert.h>
#include <alloca.h>
static pthread_mutex_t pageAllocMutex;

View file

@ -1,9 +1,6 @@
#include <stasis/transactional.h>
#include <stdlib.h>
#include <stdio.h>
#include <assert.h>
#include <string.h>
typedef struct {
short nextEntry;

View file

@ -49,9 +49,9 @@ terms specified in this license.
#include <stasis/operations.h>
#include <stasis/logger/logger2.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
recordid prepare_bogus_rec = { 0, 0, 0};
static int op_prepare(const LogEntry * e, Page * p) {

View file

@ -67,18 +67,10 @@ terms specified in this license.
*/
#include <stdlib.h>
#include <config.h>
#include <stasis/common.h>
#include <stasis/latches.h>
#include <stasis/page.h>
#include <assert.h>
#include <stdio.h>
#include <stasis/constants.h>
#include <assert.h>
#include <stasis/blobManager.h>
#include <stasis/lockManager.h>
#include <stasis/compensations.h>
@ -88,6 +80,8 @@ terms specified in this license.
#include <stasis/bufferPool.h>
#include <stasis/truncation.h>
#include <assert.h>
static page_impl page_impls[MAX_PAGE_TYPE];
static stasis_dirty_page_table_t * dirtyPages;

View file

@ -1,7 +1,7 @@
#include "config.h"
#include <stasis/page.h>
#include <stasis/page/slotted.h>
#include <assert.h>
//#include <assert.h>
#define SLOTTED_PAGE_OVERHEAD_PER_RECORD (2 * sizeof(short))
#define SLOTTED_PAGE_HEADER_OVERHEAD (3 * sizeof(short))
@ -21,7 +21,6 @@
#define SLOTTED_PAGE_CHECK_FOR_OVERLAP
#endif
#include <stasis/truncation.h>
/**
Run sanity checks to make sure page is in a consistent state.

View file

@ -1,8 +1,7 @@
#include <stasis/pageHandle.h>
#include <assert.h>
#include <string.h>
#include <errno.h>
#include <stdio.h>
/**
@todo Make sure this doesn't need to be atomic. (It isn't!) Can

View file

@ -24,9 +24,7 @@ static char rcsid[]="$Id$";
** exactly the same
*/
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>
#include <stasis/common.h>
#include <stasis/redblack.h>
#define assert(expr)
@ -146,7 +144,7 @@ RB_STATIC struct RB_ENTRY(tree) *RB_ENTRY(init)(void)
if ((retval=(struct RB_ENTRY(tree) *) malloc(sizeof(struct RB_ENTRY(tree))))==NULL)
return(NULL);
#ifndef RB_CUSTOMIZE
retval->rb_cmp=cmp;
retval->rb_config=config;
@ -165,7 +163,7 @@ RB_ENTRY(destroy)(struct RB_ENTRY(tree) *rbinfo)
if (rbinfo->rb_root!=RBNULL)
RB_ENTRY(_destroy)(rbinfo->rb_root);
free(rbinfo);
}
#endif /* no_destroy */
@ -186,7 +184,7 @@ RB_ENTRY(search)(const RB_ENTRY(data_t) *key, struct RB_ENTRY(tree) *rbinfo)
#endif /* no_search */
#ifndef no_find
RB_STATIC const RB_ENTRY(data_t) *
RB_STATIC const RB_ENTRY(data_t) *
RB_ENTRY(find)(const RB_ENTRY(data_t) *key, struct RB_ENTRY(tree) *rbinfo)
{
struct RB_ENTRY(node) *x;
@ -205,7 +203,7 @@ RB_ENTRY(find)(const RB_ENTRY(data_t) *key, struct RB_ENTRY(tree) *rbinfo)
#endif /* no_find */
#ifndef no_delete
RB_STATIC const RB_ENTRY(data_t) *
RB_STATIC const RB_ENTRY(data_t) *
RB_ENTRY(delete)(const RB_ENTRY(data_t) *key, struct RB_ENTRY(tree) *rbinfo)
{
struct RB_ENTRY(node) *x;
@ -251,7 +249,7 @@ RB_ENTRY(openlist)(const struct RB_ENTRY(tree) *rbinfo)
return(RB_ENTRY(_openlist)(rbinfo->rb_root));
}
RB_STATIC const RB_ENTRY(data_t) *
RB_STATIC const RB_ENTRY(data_t) *
RB_ENTRY(readlist)(RBLIST *rblistp)
{
if (rblistp==NULL)
@ -271,7 +269,7 @@ RB_ENTRY(closelist)(RBLIST *rblistp)
#endif /* no_readlist */
#ifndef no_lookup
RB_STATIC const RB_ENTRY(data_t) *
RB_STATIC const RB_ENTRY(data_t) *
RB_ENTRY(lookup)(int mode, const RB_ENTRY(data_t) *key, struct RB_ENTRY(tree) *rbinfo)
{
struct RB_ENTRY(node) *x;
@ -492,18 +490,18 @@ RB_ENTRY(_lookup)(int mode, const RB_ENTRY(data_t) *key, struct RB_ENTRY(tree) *
x=x->right;
else
found=1;
first = 0;
}
assert(!first);
if (found && (mode==RB_LUEQUAL || mode==RB_LUGTEQ || mode==RB_LULTEQ))
return(x);
if (!found && (mode==RB_LUEQUAL || mode==RB_LUNEXT || mode==RB_LUPREV))
return(RBNULL);
if (mode==RB_LUGTEQ || (!found && mode==RB_LUGREAT))
{
if (cmp>0)
@ -525,7 +523,7 @@ RB_ENTRY(_lookup)(int mode, const RB_ENTRY(data_t) *key, struct RB_ENTRY(tree) *
if (mode==RB_LUPREV || (found && mode==RB_LULESS))
return(RB_ENTRY(_predecessor)(x));
/* Shouldn't get here */
return(RBNULL);
}
@ -891,7 +889,7 @@ RB_ENTRY(_openlist)(const struct RB_ENTRY(node) *rootp)
return(rblistp);
}
static const RB_ENTRY(data_t) *
static const RB_ENTRY(data_t) *
RB_ENTRY(_readlist)(RBLIST *rblistp)
{
const RB_ENTRY(data_t) *key=NULL;
@ -1024,7 +1022,7 @@ RB_ENTRY(_check1)(struct RB_ENTRY(node) *x)
if (rb_check1(x->left))
return(1);
}
}
if (x->right != RBNULL)
{
@ -1036,7 +1034,7 @@ RB_ENTRY(_check1)(struct RB_ENTRY(node) *x)
if (rb_check1(x->right))
return(1);
}
}
return(0);
}
@ -1085,7 +1083,7 @@ RB_ENTRY(dumptree)(struct RB_ENTRY(node) *x, int n)
RB_ENTRY(dumptree)(x->left, n);
RB_ENTRY(dumptree)(x->right, n);
}
}
}
#endif

View file

@ -1,13 +1,10 @@
#include <config.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <stasis/common.h>
#include <stasis/lhtable.h>
#include <stasis/redblack.h>
#include <stasis/replacementPolicy.h>
#include <assert.h>
typedef struct stasis_replacement_policy_lru_entry {
void * value;
uint64_t clock;

View file

@ -1,7 +1,5 @@
#include <config.h>
#include <stdlib.h>
#include <stasis/common.h>
#include <stasis/replacementPolicy.h>
//#include <stasis/lhtable.h>
#include <stasis/doubleLinkedList.h>
#include <assert.h>
@ -9,16 +7,16 @@ typedef LL_ENTRY(value_t) value_t;
typedef struct LL_ENTRY(node_t) node_t;
typedef struct LL_ENTRY(list) list;
typedef struct lruFast {
typedef struct lruFast {
// struct LH_ENTRY(table) * hash;
struct LL_ENTRY(list) * lru;
node_t * (*getNode)(void * page, void * conf);
void (*setNode)(void * page, node_t * n,
void (*setNode)(void * page, node_t * n,
void * conf);
void * conf;
} lruFast;
static void hit(struct replacementPolicy * r, void * p) {
static void hit(struct replacementPolicy * r, void * p) {
lruFast * l = r->impl;
// node_t * n = LH_ENTRY(find)(l->hash, &id, sizeof(int));
node_t * n = l->getNode(p, l->conf);
@ -26,11 +24,11 @@ static void hit(struct replacementPolicy * r, void * p) {
LL_ENTRY(removeNoFree)(l->lru, n);
LL_ENTRY(pushNode)(l->lru, n);
}
static void* getStale(struct replacementPolicy * r) {
static void* getStale(struct replacementPolicy * r) {
lruFast * l = r->impl;
return LL_ENTRY(head)(l->lru);
}
static void* remove(struct replacementPolicy* r, void * p) {
static void* remove(struct replacementPolicy* r, void * p) {
lruFast * l = r->impl;
node_t * n = l->getNode(p, l->conf); //LH_ENTRY(remove)(l->hash, &id, sizeof(int));
assert(n);
@ -39,16 +37,16 @@ static void* remove(struct replacementPolicy* r, void * p) {
l->setNode(p, 0, l->conf);
return v;
}
static void insert(struct replacementPolicy* r,
void * p) {
static void insert(struct replacementPolicy* r,
void * p) {
lruFast * l = r->impl;
node_t * n = LL_ENTRY(push)(l->lru, p);
// LH_ENTRY(insert)(l->hash, &id, sizeof(int), n);
l->setNode(p, n, l->conf);
}
static void deinit(struct replacementPolicy * r) {
static void deinit(struct replacementPolicy * r) {
lruFast * l = r->impl;
// the node_t's get freed by LL_ENTRY. It's the caller's
// the node_t's get freed by LL_ENTRY. It's the caller's
// responsibility to free the void *'s passed into us.
//LH_ENTRY(destroy)(l->hash);
LL_ENTRY(destroy)(l->lru);
@ -57,10 +55,10 @@ static void deinit(struct replacementPolicy * r) {
}
replacementPolicy * lruFastInit(
struct LL_ENTRY(node_t) * (*getNode)(void * page, void * conf),
void (*setNode)(void * page,
void (*setNode)(void * page,
struct LL_ENTRY(node_t) * n,
void * conf),
void * conf) {
void * conf) {
struct replacementPolicy * ret = malloc(sizeof(struct replacementPolicy));
ret->deinit = deinit;
ret->hit = hit;

View file

@ -1,7 +1,4 @@
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include <stasis/common.h>
#include <stasis/ringbuffer.h>
//#define TRACK_OFFSETS
@ -50,7 +47,7 @@ ringBufferLog_t * openLogRingBuffer(size_t size, lsn_t initialOffset) {
ret->size = size;
ret->start = initialOffset % size;
ret->end = initialOffset % size;
#ifdef TRACK_OFFSETS
ret->offset= initialOffset / size;
#endif
@ -61,14 +58,14 @@ void closeLogRingBuffer(ringBufferLog_t * log) {
free(log->buf);
free(log);
}
/**
/**
This function copies size bytes from the ringbuffer at offset
'offset'. size must be less than log->size.
It probably also should lie within the boundaries defined by start
and end, but this is optional.
*/
static void memcpyFromRingBuffer(byte * dest, ringBufferLog_t * log, lsn_t lsn, size_t size) {
static void memcpyFromRingBuffer(byte * dest, ringBufferLog_t * log, lsn_t lsn, size_t size) {
lsn_t offset = lsn_to_offset(log, lsn);
if(offset + size < log->size) {
memcpy(dest, &(log->buf[offset]), size);
@ -111,7 +108,7 @@ int ringBufferAppend(ringBufferLog_t * log, byte * dat, size_t size) {
log->end += size; // lsn_to_offset(log, log->end + size);
return 0;
}
lsn_t ringBufferAppendPosition(ringBufferLog_t * log) {
@ -142,7 +139,7 @@ static int stasis_ringbuffer_truncate(ringBufferLog_t * log, lsn_t lsn) {
#ifdef TRACK_OFFSETS
lsn_t newStart = lsn_to_offset(log, lsn);
if(newStart < lsn_to_offset(log, log->start)) { // buffer wrapped.
if(newStart < lsn_to_offset(log, log->start)) { // buffer wrapped.
log->offset += log->size;
}
#endif

View file

@ -3,6 +3,7 @@
#include <stasis/latches.h>
#include <math.h>
#include <stdio.h>
void acquired_lock(profile_tuple * tup, long spin_count) {
tup->sum_spin += spin_count;
@ -33,9 +34,9 @@ void print_profile_tuple(profile_tuple * tup) {
double std_spin = sqrt((((double)tup->sum_spin2) / ((double)tup->count)) - (mean_spin * mean_spin));
double mean_hold = ((double)tup->sum_hold)/ ((double)tup->count);
double std_hold = sqrt((((double)tup->sum_hold2) / ((double)tup->count)) - (mean_hold * mean_hold));
printf("{count=%ld spin[%1.4lf %1.4lf %0.0lf] held[%1.4lf %1.4lf %0.0lf]us}", tup->count,
mean_spin, std_spin, tup->max_spin,
printf("{count=%ld spin[%1.4lf %1.4lf %0.0lf] held[%1.4lf %1.4lf %0.0lf]us}", tup->count,
mean_spin, std_spin, tup->max_spin,
mean_hold, std_hold, tup->max_hold);
} else {
printf("{count=0}");

View file

@ -23,7 +23,7 @@
#include <stasis/blobManager.h> // XXX remove this, move Tread() to set.c
#include <assert.h>
#include <limits.h>
#include <stdio.h>
static int stasis_initted = 0;

View file

@ -75,24 +75,29 @@ terms specified in this license.
#include <limits.h>
#if STDC_HEADERS
# include <stdlib.h>
# include <string.h>
#elif HAVE_STRINGS_H
# include <strings.h>
#endif /*STDC_HEADERS*/
//#if STDC_HEADERS
//# include <stdlib.h>
//# include <string.h>
//#elif HAVE_STRINGS_H
//# include <strings.h>
//#endif /*STDC_HEADERS*/
//
//#if HAVE_UNISTD_H
//# include <unistd.h>
//#endif
//
//#if HAVE_ERRNO_H
//# include <errno.h>
//#endif /*HAVE_ERRNO_H*/
//#ifndef errno
// /* Some systems define this! */
//extern int errno;
//#endif
#if HAVE_UNISTD_H
# include <unistd.h>
#endif
#if HAVE_ERRNO_H
# include <errno.h>
#endif /*HAVE_ERRNO_H*/
#ifndef errno
/* Some systems #define this! */
extern int errno;
#endif
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<errno.h>
//#define byte unsigned char
//#define lsn_t long
@ -117,6 +122,10 @@ typedef int16_t pagetype_t;
/*#define PROFILE_LATCHES*/
/*#define NO_LATCHES */
#if _POSIX_C_SOURCE >= 199309L || _XOPEN_SOURCE >= 500
#define HAVE_FDATASYNC
#endif
#ifdef DEBUGGING
/** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */
#define DEBUG(...) \

View file

@ -1,11 +1,12 @@
/**
@file
A few io wrapper functions to simplify file I/O code in LLADD.
/**
@file
A few io wrapper functions to simplify file I/O code in LLADD.
*/
#include <stasis/common.h>
#include <stdio.h>
BEGIN_C_DECLS

View file

@ -15,22 +15,11 @@
* Found the code at this url:
* http://www.cs.nmsu.edu/~jcook/Tools/pthreads/rw.c
*/
#include <stasis/common.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#ifndef __LIBDFA_RW_H
#define __LIBDFA_RW_H
#ifdef __cplusplus
# define BEGIN_C_DECLS extern "C" {
# define END_C_DECLS }
#else /* !__cplusplus */
# define BEGIN_C_DECLS
# define END_C_DECLS
#endif /* __cplusplus */
BEGIN_C_DECLS
typedef struct {