Fixed blob implementation (it was impossible to distinguish between a blob and a record of length 12), removed jbhash, and ported cht to linearHashNTA.

This commit is contained in:
Sears Russell 2005-01-31 01:29:52 +00:00
parent 7cf5fdee6e
commit bb69197a65
13 changed files with 181 additions and 548 deletions

View file

@ -132,12 +132,16 @@ terms specified in this license.
@see slotted.c, indirect.c
*/
#define INVALID_SLOT PAGE_SIZE
/** This constant is used as a placeholder to mark slot locations that contain blobs.
@see slotted.c, indirect.c, blobManager.c */
#define BLOB_SLOT (PAGE_SIZE + 1)
/* #define NORMAL_SLOT (PAGE_SIZE + 1)
#define BLOB_SLOT (PAGE_SIZE + 2)*/
/** @deprecated Replace all occurrances with sizeof(blob_record_t) */
#define BLOB_REC_SIZE sizeof(blob_record_t) /*12*/
/* * @ deprecated Replace all occurrances with sizeof(blob_record_t) */
//#define BLOB_REC_SIZE sizeof(blob_record_t) /*12*/
#define BLOB_THRESHOLD_SIZE (PAGE_SIZE-30)
#define BITS_PER_BYTE 8

View file

@ -1,176 +0,0 @@
/*---
This software is copyrighted by the Regents of the University of
California, and other parties. The following terms apply to all files
associated with the software unless explicitly disclaimed in
individual files.
The authors hereby grant permission to use, copy, modify, distribute,
and license this software and its documentation for any purpose,
provided that existing copyright notices are retained in all copies
and that this notice is included verbatim in any distributions. No
written agreement, license, or royalty fee is required for any of the
authorized uses. Modifications to this software may be copyrighted by
their authors and need not follow the licensing terms described here,
provided that the new terms are clearly indicated on the first page of
each file where they apply.
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
GOVERNMENT USE: If you are acquiring this software on behalf of the
U.S. government, the Government shall have only "Restricted Rights" in
the software and related documentation as defined in the Federal
Acquisition Regulations (FARs) in Clause 52.227.19 (c) (2). If you are
acquiring the software on behalf of the Department of Defense, the
software shall be classified as "Commercial Computer Software" and the
Government shall have only "Restricted Rights" as defined in Clause
252.227-7013 (c) (1) of DFARs. Notwithstanding the foregoing, the
authors grant the U.S. Government and others acting in its behalf
permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
/**
* header file for jb hash table structs
* Based on Peter Graf's pblhash, <http://mission.base.com/peter/source/>
* Jim Blomo <jim@xcf.berkeley.edu>
* $Id$
*/
#ifndef __JBHASH_H__
#define __JBHASH_H__
#include <lladd/common.h>
#include <lladd/transactional.h>
/* #define JB_HASHTABLE_SIZE 79 */
typedef struct {
recordid store;
size_t keylen;
size_t datlen;
recordid next;
} jbHashItem_t;
typedef struct {
int size;
recordid hashmap_record; /*[JB_HASHTABLE_SIZE]*/
recordid store;
jbHashItem_t *iterItem;
unsigned int iterIndex;
byte *iterData;
recordid* hashmap;
} jbHashTable_t;
/**
* jbHtCreate makes a new persistant hashtable
* @param xid transaction id
*
* @param size The number of hashbuckets. Currently, jbHash does not
* resize its bucket table, so it is important to set this number
* appropriately.
*
* @return pointer to hashtable, or NULL on error
*/
jbHashTable_t* jbHtCreate(int xid, int size);
/**
* jbHtValid determins if a hashtable pointer is valid
* @param xid transaction id
* @param ht hashtable you want to validate
* @return true if valid, false otherwise
*/
int jbHtValid(int xid, jbHashTable_t *ht);
/**
* Insert a key/value pair
* makes a SHALLOW COPY of the data to keep in durable storage
* will REPLACE an existing entry with the same key
* @param xid transaction id
* @param ht hashtable in which to insert
* @param key pointer to data serving as key
* @param keylen how much data to use from pointer
* @param dat data to insert
* @param datlen length data
* @return -1 on error, 0 on success
*/
int jbHtInsert(int xid, jbHashTable_t *ht, const byte *key, size_t keylen, const byte *dat, size_t datlen);
/**
* Lookup a value with a key
* @param xid transaction id
* @param ht hashtable in which to look
* @param key pointer to key data
* @param keylen length of key
* @param buf preallocated buffer in which to put data
* @return -1 if error occurs, including nothing found
*/
int jbHtLookup( int xid, jbHashTable_t *ht, const byte *key, size_t keylen, byte *buf );
/**
* Delete entry associated with key
* @param xid transaction id
* @param ht hashtable in which to delete
* @param key pointer to key data
* @param keylen length of key
* @param buf if non-NULL, preallocated space to copy data from deleted key
* @return -1 on errors or not found, 0 if existing entry was deleted
*/
int jbHtRemove( int xid, jbHashTable_t *ht, const byte *key, size_t keylen, byte *buf );
/**
* Start an iterator on the hash table
* @param xid transaction id
* @param ht hashtable
* @param buf preallocated space to put data
* @return -1 for no data or error, 0 on success
*/
int jbHtFirst( int xid, jbHashTable_t *ht, byte *buf );
/**
* iterates to the next item
* @param xid transaction id
* @param ht hashtable
* @param buf preallocated space to put data
* @return -1 for no data or error, 0 on success
*/
int jbHtNext( int xid, jbHashTable_t *ht, byte *buf );
/**
* get data for the place the iterator is currently in
* @param xid transaction id
* @param ht hashtable
* @param buf preallocated space to put data
* @return -1 for no data or error, 0 on success
*/
int jbHtCurrent(int xid, jbHashTable_t *ht, byte *buf);
/**
* get key for the place the iterator is currently in
* @param xid transaction id
* @param ht hashtable
* @param buf preallocated space to put key
* @return -1 for no data or error, 0 on success
*/
int jbHtCurrentKey(int xid, jbHashTable_t *ht, byte *buf);
/**
* Delete a hashtable
* table must be empty
* @param xid transaction id
* @param ht hashtable to delete
* @return 0 on success, -1 on error
*/
int jbHtDelete(int xid, jbHashTable_t *ht);
#endif

View file

@ -104,14 +104,26 @@ state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char
int xid_exists(int ht_xid, jbHashTable_t * xid_ht, StateMachine * stateMachine) {
int xid;
if (-1 == jbHtLookup(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid)) {
int xid_exists(int ht_xid, recordid xid_ht, StateMachine * stateMachine) {
int * xid = 0;
/* if (-1 == jbHtLookup(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid)) {
return 0;
} else {
assert(xid);
return xid;
} */
int size = ThashLookup(ht_xid, xid_ht,
(byte*)&(stateMachine->machine_id), sizeof(state_machine_id),
(byte**)&xid);
if(size != -1) {
assert(size == sizeof(int));
int ret = *xid;
free(xid);
return ret;
} else {
return 0;
}
}
DfaSet * cHtInit(int cht_type,
@ -143,8 +155,10 @@ DfaSet * cHtInit(int cht_type,
}
if(cht_type != CHT_CLIENT) {
chtApp_state->xid_ht = jbHtCreate(xid, 79);
chtApp_state->ht_ht = jbHtCreate(xid, 79);
/* chtApp_state->xid_ht = jbHtCreate(xid, 79);
chtApp_state->ht_ht = jbHtCreate(xid, 79); */
chtApp_state->xid_ht = ThashCreate(xid, sizeof(state_machine_id), sizeof(int));
chtApp_state->ht_ht = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH);
chtApp_state->ht_xid = Tbegin(); // !!!!
chtApp_state->next_hashTableId = 0;

View file

@ -40,7 +40,7 @@ permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
#include <libdfa/libdfa.h>
#include <pbl/jbhash.h>
//#include <pbl/jbhash.h>
#include "../../2pc/2pc.h"
#define CHT_COORDINATOR 1
#define CHT_SERVER 2
@ -146,8 +146,10 @@ int cHtGetXid(state_machine_id* xid, DfaSet * dfaSet);
/** The server side state for a CHT. */
typedef struct {
int ht_xid;
jbHashTable_t * xid_ht;
jbHashTable_t * ht_ht;
// jbHashTable_t * xid_ht;
// jbHashTable_t * ht_ht;
recordid xid_ht;
recordid ht_ht;
/** This gets incremented by the coordinator each time a new hashtable is allocated. */
int next_hashTableId;
} CHTAppState;

View file

@ -3,7 +3,7 @@
#include <assert.h>
#include <string.h>
#include <netinet/in.h>
#define setup_vars \
/*#define setup_vars \
TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); \
CHTAppState * app_state_cht = app_state_2pc->app_state; \
jbHashTable_t * xid_ht = app_state_cht->xid_ht; \
@ -12,68 +12,110 @@ int ht_xid = app_state_cht->ht_xid;
int xid; \
int xid_exists = (-1 != jbHtLookup(ht_xid, xid_ht,(byte*) &(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid)); \
jbHashTable_t ht; \
int ht_exists = (-1 != jbHtLookup(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),(byte*) &ht))
int ht_exists = (-1 != jbHtLookup(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),(byte*) &ht))*/
#define setup_vars \
TwoPCAppState * app_state_2pc = ((TwoPCAppState*)(((DfaSet*)dfaSet)->app_setup)); \
CHTAppState * app_state_cht = app_state_2pc->app_state; \
recordid xid_ht = app_state_cht->xid_ht; \
recordid ht_ht = app_state_cht->ht_ht; \
int ht_xid = app_state_cht->ht_xid; \
int xid; \
int * xid_ptr = 0; \
int xid_exists = (-1 != ThashLookup(ht_xid, xid_ht,(byte*) &(stateMachine->machine_id), sizeof(state_machine_id), (byte**)&xid_ptr)); \
if(xid_ptr) { xid=*xid_ptr; free(xid_ptr); }\
recordid ht; \
recordid * ht_ptr = 0; \
int ht_exists = (-1 != ThashLookup(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),(byte**) &ht_ptr)); \
if(ht_ptr) { ht=*ht_ptr; free(ht_ptr) ;}
/** TODO For now, we ignore the possiblity that jbHashTable's functions
return error codes. Instead, we assume that they always
succeed. */
static state_name do_work(void * dfaSet, StateMachine * stateMachine, Message * m, char * from) {
int ret;
setup_vars;
int ret = 1;
switch(*requestType(m))
{
case CREATE:
{
jbHashTable_t * new = jbHtCreate(ht_xid, 79);
if(new != NULL) {
ret = (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)new, sizeof(jbHashTable_t)) >= 0);
} else {
ret = 0;
}
if(ret) {
printf("Created local slice of global hash table %d\n", (__header_ptr(m)->hashTable));
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
} else {
printf("Failed to insert new hash table slice!");
}
recordid new = ThashCreate(ht_xid, VARIABLE_LENGTH, VARIABLE_LENGTH);
// jbHashTable_t * new = jbHtCreate(ht_xid, 79);
// if(new != NULL) {
ThashInsert(ht_xid, ht_ht,
(byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t),
(byte*)&new, sizeof(recordid));
// ret = (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)new, sizeof(jbHashTable_t)) >= 0);
// } else {
// ret = 0;
// }
// if(ret) {
printf("Created local slice of global hash table %d\n", (__header_ptr(m)->hashTable)); fflush(stdout);
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
// } else {
// printf("Failed to insert new hash table slice!");
// }
} break;
case INSERT:
{
if(!ht_exists) { printf ("Hash table %d doesn't exist!\n", (__header_ptr(m)->hashTable)); fflush(NULL); ret = 0; } else {
ret = (jbHtInsert(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m)) >= 0);
printf("Insert: %d ht=%d (key length %d) %d -> %d\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m));
fflush(NULL);
(jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
if(!ht_exists) {
printf ("Hash table %d doesn't exist!\n", (__header_ptr(m)->hashTable)); fflush(stdout); ret = 0;
} else {
//ret = (jbHtInsert(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m)) >= 0);
ThashInsert(xid, ht, getKeyAddr(m), getKeyLength(m), getValAddr(m), getValLength(m));
printf("Insert: %d ht=%d (key length %d) %d -> %d\n", ret,
(__header_ptr(m)->hashTable), getKeyLength(m),
*(int*)getKeyAddr(m), *(int*)getValAddr(m));
fflush(stdout);
// (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
}
} break;
case LOOKUP:
{
if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else {
ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0);
printf("Lookup: %d ht=%d (key length %d) %d -> %d\n", ret, (__header_ptr(m)->hashTable), getKeyLength(m), *(int*)getKeyAddr(m), *(int*)getValAddr(m));
fflush(NULL);
if(!ht_exists) {
printf ("Hash table doesn't exist!\n"); fflush(stdout); ret = 0;
} else {
byte * new;
int valueLength = ThashLookup(xid, ht, getKeyAddr(m), getKeyLength(m), &new);
if(valueLength != -1) {
assert(valueLength <= getValLength(m));
memcpy(getValAddr(m), new, valueLength);
free(new);
} else {
ret = 0;
}
// ret = (jbHtLookup(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0);
printf("Lookup: %d ht=%d (key length %d) %d -> %d\n", ret,
(__header_ptr(m)->hashTable), getKeyLength(m),
*(int*)getKeyAddr(m), *(int*)getValAddr(m));
fflush(stdout);
}
} break;
case REMOVE:
{
if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else {
ret = (jbHtRemove(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0);
(jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
if(!ht_exists) {
printf ("Hash table doesn't exist!\n"); fflush(stdout); ret = 0;
} else {
/** @todo no longer return old value on remove... */
ret = ThashRemove(xid, ht, getKeyAddr(m), getKeyLength(m));
// ret = (jbHtRemove(xid, &ht, getKeyAddr(m), getKeyLength(m), getValAddr(m)) >= 0);
// (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
}
} break;
case DELETE:
{
if(!ht_exists) { printf ("Hash table doesn't exist!\n"); fflush(NULL); ret = 0; } else {
jbHtRemove(xid, ht_ht, getKeyAddr(m), getKeyLength(m), NULL);
(jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
ThashRemove(xid, ht_ht, getKeyAddr(m), getKeyLength(m));
// jbHtRemove(xid, ht_ht, getKeyAddr(m), getKeyLength(m), NULL);
// (jbHtInsert(ht_xid, ht_ht, (byte*)&(__header_ptr(m)->hashTable), sizeof(clusterHashTable_t), (byte*)&ht, sizeof(jbHashTable_t)));
/* ret = (jbHtDelete(xid, &ht) >= 0); */ /* Don't need this--jbHtDelete just frees the (stack!) pointer. */
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
@ -168,13 +210,10 @@ state_name veto_or_prepare_cht(void * dfaSet, StateMachine * stateMachine, Messa
printf("Tbegin failed; %d\n", xid);
} else if(jbHtInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int)) == -1) {
printf("jbHtInsert failed.\n");
// } else if(jbHtInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int)) == -1) {
} else {
xid_exists = 1;
ThashInsert(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid, sizeof(int));
xid_exists = 1;
}
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
@ -205,7 +244,8 @@ state_name abort_cht(void * dfaSet, StateMachine * stateMachine, Message * m, ch
assert(xid_exists);
Tabort(xid); // !!!!
jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid);
// jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid);
ThashRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id));
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
return 1;
@ -218,7 +258,8 @@ state_name commit_cht(void * dfaSet, StateMachine * stateMachine, Message * m, c
assert(xid_exists);
Tcommit(xid);
jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid);
// jbHtRemove(ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id), (byte*)&xid);
ThashRemove(app_state_cht->ht_xid, xid_ht, (byte*)&(stateMachine->machine_id), sizeof(state_machine_id));
Tcommit(app_state_cht->ht_xid);
app_state_cht->ht_xid = Tbegin();
/* }*/

View file

@ -205,7 +205,7 @@ recordid preAllocBlob(int xid, long blobSize) {
/* First in buffer manager. */
recordid rid = Talloc(xid, sizeof(blob_record_t));
recordid rid = Talloc(xid, BLOB_SLOT); //sizeof(blob_record_t));
rid.size = blobSize;
@ -223,7 +223,7 @@ recordid preAllocBlobFromPage(int xid, long page, long blobSize) {
/* First in buffer manager. */
recordid rid = TallocFromPage(xid, page, sizeof(blob_record_t));
recordid rid = TallocFromPage(xid, page, BLOB_SLOT); //sizeof(blob_record_t));
rid.size = blobSize;

View file

@ -41,7 +41,7 @@ static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat)
assert(*page_type_ptr(p) == SLOTTED_PAGE); */
if(rid.size >= BLOB_THRESHOLD_SIZE) {
if(rid.size >= BLOB_THRESHOLD_SIZE && rid.size != BLOB_SLOT) {
allocBlob(xid, p, lsn, rid);
} else {
slottedPostRalloc(p, lsn, rid);
@ -59,8 +59,9 @@ static int deoperate(int xid, Page * p, lsn_t lsn, recordid rid, const void * da
static int reoperate(int xid, Page *p, lsn_t lsn, recordid rid, const void * dat) {
if(rid.size >= BLOB_THRESHOLD_SIZE) {
rid.size = BLOB_REC_SIZE; /* Don't reuse blob space yet... */
if(rid.size >= BLOB_THRESHOLD_SIZE && rid.size != BLOB_SLOT) {
// rid.size = BLOB_REC_SIZE; /* Don't reuse blob space yet... */
rid.size = sizeof(blob_record_t);
}
slottedPostRalloc(p, lsn, rid);
@ -108,7 +109,7 @@ Operation getRealloc() {
recordid Talloc(int xid, long size) {
recordid rid;
Page * p = NULL;
if(size >= BLOB_THRESHOLD_SIZE) {
if(size >= BLOB_THRESHOLD_SIZE && size != BLOB_SLOT) {
/**@todo is it OK that Talloc doesn't pin the page when a blob is alloced?*/
rid = preAllocBlob(xid, size);
} else {
@ -126,8 +127,6 @@ recordid Talloc(int xid, long size) {
releasePage(p);
pthread_mutex_unlock(&talloc_mutex);
/*pthread_mutex_unlock(&talloc_mutex); */
}
return rid;
@ -138,7 +137,7 @@ recordid TallocFromPage(int xid, long page, long size) {
recordid rid;
Page * p = NULL;
if(size >= BLOB_THRESHOLD_SIZE) {
if(size >= BLOB_THRESHOLD_SIZE && size != BLOB_SLOT) {
rid = preAllocBlobFromPage(xid, page, size);
} else {
pthread_mutex_lock(&talloc_mutex);

View file

@ -324,7 +324,8 @@ int getRecordTypeUnlocked(int xid, Page * p, recordid rid) {
} else if(page_type == SLOTTED_PAGE) {
if(*numslots_ptr(p) <= rid.slot || *slot_ptr(p, rid.slot) == INVALID_SLOT /*|| *slot_length_ptr(p, rid.slot) == INVALID_SLOT*/) {
return UNINITIALIZED_PAGE;
} else if(*slot_length_ptr(p, rid.slot) == BLOB_REC_SIZE) {
// } else if(*slot_length_ptr(p, rid.slot) == BLOB_REC_SIZE) {
} else if (*slot_length_ptr(p, rid.slot) == BLOB_SLOT) {
return BLOB_RECORD;
} else {
return SLOTTED_RECORD;

View file

@ -2,7 +2,7 @@
#include "../page.h"
/*#include "../blobManager.h" */
#include "../blobManager.h" /** So that we can call sizeof(blob_record_t) */
#include "slotted.h"
#include <assert.h>
@ -161,8 +161,17 @@ int slottedFreespace(Page * page) {
recordid slottedPreRalloc(int xid, long size, Page ** pp) {
recordid ret;
assert(size < BLOB_THRESHOLD_SIZE);
int isBlob = 0;
if(size == BLOB_SLOT) {
isBlob = 1;
size = sizeof(blob_record_t);
}
assert(size < BLOB_THRESHOLD_SIZE);
/** @todo is ((unsigned int) foo) == -1 portable? Gotta love C.*/
if(lastFreepage == -1) {
@ -174,6 +183,7 @@ recordid slottedPreRalloc(int xid, long size, Page ** pp) {
*pp = loadPage(lastFreepage);
}
if(slottedFreespace(*pp) < size ) {
releasePage(*pp);
lastFreepage = TpageAlloc(xid);
@ -183,6 +193,10 @@ recordid slottedPreRalloc(int xid, long size, Page ** pp) {
ret = slottedRawRalloc(*pp, size);
if(isBlob) {
*slot_length_ptr(*pp, ret.slot) = BLOB_SLOT;
}
DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size);
return ret;
@ -190,7 +204,12 @@ recordid slottedPreRalloc(int xid, long size, Page ** pp) {
recordid slottedPreRallocFromPage(int xid, long page, long size, Page **pp) {
recordid ret;
int isBlob = 0;
if(size == BLOB_SLOT) {
isBlob = 1;
size = sizeof(blob_record_t);
}
*pp = loadPage(page);
if(slottedFreespace(*pp) < size) {
@ -208,7 +227,9 @@ recordid slottedPreRallocFromPage(int xid, long page, long size, Page **pp) {
}
assert(*page_type_ptr(*pp) == SLOTTED_PAGE);
ret = slottedRawRalloc(*pp, size);
if(isBlob) {
*slot_length_ptr(*pp, ret.slot) = BLOB_SLOT;
}
return ret;
}
@ -243,6 +264,13 @@ recordid slottedRawRalloc(Page * page, int size) {
static void __really_do_ralloc(Page * page, recordid rid) {
int freeSpace;
int isBlob = 0;
if(rid.size == BLOB_SLOT) {
isBlob = 1;
rid.size = sizeof(blob_record_t);
}
assert(rid.size > 0);
@ -267,7 +295,11 @@ static void __really_do_ralloc(Page * page, recordid rid) {
*slot_ptr(page, rid.slot) = freeSpace;
/* assert(!*slot_length_ptr(page, rid.slot) || (-1 == *slot_length_ptr(page, rid.slot)));*/
*slot_length_ptr(page, rid.slot) = rid.size;
if(isBlob) {
*slot_length_ptr(page, rid.slot = BLOB_SLOT);
} else {
*slot_length_ptr(page, rid.slot) = rid.size;
}
}
@ -343,7 +375,7 @@ void slottedReadUnlocked(int xid, Page * page, recordid rid, byte *buff) {
assert(page->id == rid.page);
slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE));
assert((rid.size == slot_length) || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE));
if(!memcpy(buff, record_ptr(page, rid.slot), rid.size)) {
perror("memcpy");
@ -366,7 +398,7 @@ void slottedRead(int xid, Page * page, recordid rid, byte *buff) {
assert(page->id == rid.page);
slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE));
assert((rid.size == slot_length) || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE));
if(!memcpy(buff, record_ptr(page, rid.slot), rid.size)) {
perror("memcpy");
@ -387,7 +419,7 @@ void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *dat
assert(page->id == rid.page);
slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE));
assert((rid.size == slot_length) || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE));
if(!memcpy(record_ptr(page, rid.slot), data, rid.size)) {
perror("memcpy");
@ -407,7 +439,7 @@ void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const b
assert(page->id == rid.page);
slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length) || (slot_length >= PAGE_SIZE));
assert((rid.size == slot_length) || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE));
if(!memcpy(record_ptr(page, rid.slot), data, rid.size)) {
perror("memcpy");

View file

@ -1,4 +1,5 @@
# TODO: Doesn't build the pbl tests..
lib_LIBRARIES=libpbl.a
libpbl_a_SOURCES=pbl.c pblhash.c pblkf.c pblisam.c jbhash.c
libpbl_a_SOURCES=pbl.c pblhash.c pblkf.c pblisam.c
# jbhash.c
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -1,295 +0,0 @@
/*
* A durable, recoverable hashtable
* Based on Peter Graf's pblhash, <http://mission.base.com/peter/source/>
* Jim Blomo <jim@xcf.berkeley.edu>
* $Id$
*/
#include <assert.h>
#include <stdio.h>
#include <memory.h>
#include <stdlib.h>
#include <lladd/common.h>
#include <pbl/jbhash.h>
const recordid ZERO_RECORDID = {0,0,0};
jbHashTable_t* jbHtCreate(int xid, int size) {
jbHashTable_t *ht;
ht = (jbHashTable_t*)malloc(sizeof(jbHashTable_t));
if( ht ) {
recordid * hm = malloc(sizeof(recordid) * size);
if(hm) {
memset(hm, 0, sizeof(recordid)*size);
ht->size = size;
ht->store = Talloc(xid, sizeof(jbHashTable_t));
ht->hashmap_record = Talloc(xid, sizeof(recordid) * size);
ht->hashmap = NULL; /* Always should be NULL in the store, so that we know if we need to read it in */
Tset(xid, ht->store, ht);
ht->hashmap = hm;
Tset(xid, ht->hashmap_record, ht->hashmap);
ht->iterIndex = 0;
ht->iterData = NULL;
return ht;
} else {
free(ht);
return NULL;
}
}
return NULL;
}
int jbHtValid(int xid, jbHashTable_t *ht) {
int ret;
jbHashTable_t *test = (jbHashTable_t*)malloc(sizeof(jbHashTable_t));
Tread(xid, ht->store, test);
ret = ( test->store.size == ht->store.size
&& test->store.slot == ht->store.slot
&& test->store.page == ht->store.page );
/* TODO: Check hashmap_record? */
free(test);
return ret;
}
/**
* Hash function generator, taken directly from pblhash
*/
static int hash( const unsigned char * key, size_t keylen, int size ) {
int ret = 104729;
for( ; keylen-- > 0; key++ )
{
if( *key )
{
ret *= *key + keylen;
ret %= size;
}
}
return( ret % size );
}
/** Should be called the first time ht->hashmap is accessed by a library function.
Checks to see if the hashmap record has been read in, reads it if necessary, and then
returns a pointer to it. */
static recordid* _getHashMap(int xid, jbHashTable_t *ht) {
if(! ht->hashmap) {
ht->hashmap = malloc(sizeof(recordid) * ht->size);
Tread(xid, ht->hashmap_record, ht->hashmap);
}
return ht->hashmap;
}
int jbHtInsert(int xid, jbHashTable_t *ht, const byte *key, size_t keylen, const byte *dat, size_t datlen) {
int index = hash( key, keylen, ht->size);
recordid rid = _getHashMap(xid, ht)[index];
byte *newd;
jbHashItem_t newi;
if( rid.size == 0 ) { /* nothing with this hash has been inserted */
newi.store = Talloc(xid, sizeof(jbHashItem_t)+keylen+datlen);
newd = malloc(sizeof(jbHashItem_t)+keylen+datlen);
newi.keylen = keylen;
newi.datlen = datlen;
newi.next = ZERO_RECORDID;
memcpy(newd, &newi, sizeof(jbHashItem_t));
memcpy(newd+sizeof(jbHashItem_t), key, keylen);
memcpy(newd+sizeof(jbHashItem_t)+keylen, dat, datlen);
Tset(xid, newi.store, newd);
ht->hashmap[index] = newi.store;
/* Tset(xid, ht->store, ht); */
Tset(xid, ht->hashmap_record, ht->hashmap);
free(newd);
} else {
byte *item = NULL;
do {
free(item); /* NULL ignored by free */
item = malloc(rid.size);
Tread(xid, rid, item);
if( ((jbHashItem_t*)item)->keylen == keylen && !memcmp(key, item+sizeof(jbHashItem_t), keylen)) {
memcpy(item+sizeof(jbHashItem_t)+keylen, dat, ((jbHashItem_t*)item)->datlen);
Tset(xid, ((jbHashItem_t*)item)->store, item);
free(item);
return 0;
}
rid = ((jbHashItem_t*)item)->next; /* could go off end of list */
} while( ((jbHashItem_t*)item)->next.size != 0 );
/* now item is the tail */
newi.store = Talloc(xid, sizeof(jbHashItem_t)+keylen+datlen);
newd = malloc(sizeof(jbHashItem_t)+keylen+datlen);
newi.keylen = keylen;
newi.datlen = datlen;
newi.next = ZERO_RECORDID;
memcpy(newd, &newi, sizeof(jbHashItem_t));
memcpy(newd+sizeof(jbHashItem_t), key, keylen);
memcpy(newd+sizeof(jbHashItem_t)+keylen, dat, datlen);
Tset(xid, newi.store, newd);
((jbHashItem_t*)item)->next = newi.store;
Tset(xid, ((jbHashItem_t*)item)->store, item);
free(item);
free(newd);
}
return 0;
}
int jbHtLookup( int xid, jbHashTable_t *ht, const byte *key, size_t keylen, byte *buf ) {
int index = hash(key, keylen, ht->size);
recordid rid = _getHashMap(xid, ht)[index];
if( rid.size == 0 ) { /* nothing inserted with this hash */
return -1;
} else {
byte *item = NULL;
item = malloc(rid.size);
Tread(xid, rid, item);
for( ; !(((jbHashItem_t*)item)->keylen == keylen && !memcmp(key, item+sizeof(jbHashItem_t), keylen));
rid = ((jbHashItem_t*)item)->next ) {
if( rid.size == 0) { /* at the end of the list and not found */
return -1;
}
free(item);
item = malloc(rid.size);
Tread(xid, rid, item);
}
/* rid is what we want */
memcpy(buf, item+sizeof(jbHashItem_t)+((jbHashItem_t*)item)->keylen, ((jbHashItem_t*)item)->datlen);
free(item);
return 0;
}
return 0;
}
int jbHtRemove( int xid, jbHashTable_t *ht, const byte *key, size_t keylen, byte *buf ) {
int index = hash(key, keylen, ht->size);
recordid rid = _getHashMap(xid, ht)[index];
if( rid.size == 0) { /* nothing inserted with this hash */
return -1;
} else {
byte *del = malloc(rid.size);
Tread(xid, rid, del);
if( ((jbHashItem_t*)del)->keylen == keylen && !memcmp(key, del+sizeof(jbHashItem_t), keylen) ) {
/* the head is the entry to be removed */
if( buf ) {
memcpy( buf, del+sizeof(jbHashItem_t*)+keylen, ((jbHashItem_t*)del)->datlen);
}
ht->hashmap[index] = ((jbHashItem_t*)del)->next;
/* Tset(xid, ht->store, ht); */
Tset(xid, ht->hashmap_record, ht->hashmap);
/* TODO: dealloc rid */
free(del);
return 0;
} else {
byte * prevd = NULL;
while( ((jbHashItem_t*)del)->next.size ) {
free(prevd); /* free will ignore NULL args */
prevd = del;
rid = ((jbHashItem_t*)del)->next;
del = malloc(rid.size);
Tread(xid, rid, del);
if( ((jbHashItem_t*)del)->keylen == keylen && !memcmp(key, del+sizeof(jbHashItem_t), keylen) ) {
if( buf ) {
memcpy( buf, del+sizeof(jbHashItem_t)+keylen, ((jbHashItem_t*)del)->datlen);
}
((jbHashItem_t*)prevd)->next = ((jbHashItem_t*)del)->next;
Tset(xid, ((jbHashItem_t*)prevd)->store, prevd);
/* TODO: dealloc rid */
free(prevd);
free(del);
return 0;
}
}
/* could not find exact key */
free(prevd);
free(del);
return -1;
}
}
assert( 0 ); /* should not get here */
return -1;
}
int jbHtFirst( int xid, jbHashTable_t *ht, byte *buf ) {
ht->iterIndex = 0;
ht->iterData = NULL;
return jbHtNext( xid, ht, buf);
}
int jbHtNext( int xid, jbHashTable_t *ht, byte *buf ) {
_getHashMap(xid, ht);
if( ht->iterData && (((jbHashItem_t*)(ht->iterData))->next.size != 0) ) {
recordid next = ((jbHashItem_t*)(ht->iterData))->next;
free( ht->iterData );
ht->iterData = malloc(next.size);
Tread(xid, next, ht->iterData);
} else {
while(ht->iterIndex < ht->size) {
if( ht->hashmap[ht->iterIndex].size )
break;
else
ht->iterIndex++;
}
if( ht->iterIndex == ht->size) /* went through and found no data */
return -1;
free( ht->iterData );
ht->iterData = malloc(ht->hashmap[ht->iterIndex].size); /* to account for the last post incr */
Tread(xid, ht->hashmap[ht->iterIndex++], ht->iterData); /* increment for next round */
}
return jbHtCurrent(xid, ht, buf);
}
int jbHtCurrent(int xid, jbHashTable_t *ht, byte *buf) {
if( ht->iterData ) {
memcpy(buf, ht->iterData + sizeof(jbHashItem_t) + ((jbHashItem_t*)(ht->iterData))->keylen, ((jbHashItem_t*)(ht->iterData))->datlen);
return 0;
}
return -1;
}
int jbHtCurrentKey(int xid, jbHashTable_t *ht, byte *buf) {
if( ht->iterData ) {
memcpy(buf, ht->iterData + sizeof(jbHashItem_t), ((jbHashItem_t*)(ht->iterData))->keylen);
return 0;
}
return -1;
}
int jbHtDelete(int xid, jbHashTable_t *ht) {
/* deralloc ht->store */
if(ht->hashmap) { free(ht->hashmap); }
free(ht);
return 0;
}

View file

@ -12,27 +12,32 @@ mkdir 'sub3';
chdir 'coord';
system ("rm *.txt");
system ("../coordinator ../cluster.conf > /dev/null &");
#system ("../coordinator ../cluster.conf > /dev/null &");
system ("../coordinator ../cluster.conf 2>&1 | ../prepend coordinator &");
chdir '..';
chdir 'sub0';
system ("rm *.txt");
system ("../subordinate 0 ../cluster.conf > /dev/null &");
#system ("../subordinate 0 ../cluster.conf > /dev/null &");
system ("../subordinate 0 ../cluster.conf 2>&1 | ../prepend 'subordinate 0' &");
chdir '..';
chdir 'sub1';
system ("rm *.txt");
system ("../subordinate 1 ../cluster.conf > /dev/null &");
#system ("../subordinate 1 ../cluster.conf > /dev/null &");
system ("../subordinate 1 ../cluster.conf 2>&1 | ../prepend 'subordinate 1' &");
chdir '..';
chdir 'sub2';
system ("rm *.txt");
system ("../subordinate 2 ../cluster.conf > /dev/null &");
#system ("../subordinate 2 ../cluster.conf > /dev/null &");
system ("../subordinate 2 ../cluster.conf 2>&1 | ../prepend 'subordinate 2' &");
chdir '..';
chdir 'sub3';
system ("rm *.txt");
system ("../subordinate 3 ../cluster.conf > /dev/null &");
#system ("../subordinate 3 ../cluster.conf > /dev/null &");
system ("../subordinate 3 ../cluster.conf 2>&1 | ../prepend 'subordinate 3' &");
chdir '..';
system ("rm *.txt");
my $ret = system ("./client 2>&1");
my $ret = system ("./client 2>&1 | ./prepend client");
`killall coordinator`;
`killall subordinate`;

View file

@ -48,6 +48,7 @@ terms specified in this license.
#include "../../src/lladd/page/slotted.h"
#include "../../src/lladd/page/fixed.h"
#include "../../src/lladd/page/indirect.h"
#include "../../src/lladd/blobManager.h"
#include <lladd/bufferManager.h>
#include <lladd/transactional.h>
@ -448,7 +449,11 @@ START_TEST(pageTrecordTypeTest) {
bad.slot = slot.slot;
assert(TrecordType(xid, bad) == SLOTTED_RECORD);
/* Try to trick TrecordType by allocating a record that's the
same length as the slot used by blobs. */
recordid rid2 = Talloc(xid, sizeof(blob_record_t));
assert(TrecordType(xid, rid2) == SLOTTED_RECORD);
Tcommit(xid);
Tdeinit();