added simplistic support for log reordering

This commit is contained in:
Sears Russell 2009-03-16 08:11:49 +00:00
parent 45a2410a25
commit b1f44ab005
10 changed files with 296 additions and 18 deletions

View file

@ -14,6 +14,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c
logger/inMemoryLog.c
logger/logHandle.c logger/logger2.c
logger/logMemory.c page/raw.c page/slotted.c page/lsnFree.c
logger/reorderingHandle.c
page/fixed.c compensations.c
operations/pageOperations.c page/indirect.c
operations/decrement.c operations/increment.c

View file

@ -10,6 +10,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common
logger/filePool.c \
logger/inMemoryLog.c logger/logHandle.c logger/logger2.c \
logger/logMemory.c \
logger/reorderingHandle.c \
page/raw.c page/slotted.c page/lsnFree.c page/fixed.c compensations.c \
operations/pageOperations.c page/indirect.c operations/decrement.c \
operations/increment.c operations/prepare.c operations/set.c \

View file

@ -0,0 +1,101 @@
#include <stasis/transactional.h>
#include <stasis/logger/reorderingHandle.h>
#include <string.h>
static void* stasis_log_reordering_handle_worker(void * a) {
stasis_log_reordering_handle_t * h = (typeof(h))a;
pthread_mutex_lock(&h->mut);
while(h->cur_len || !h->closed) {
while(h->cur_len) {
size_t chunk_len = 0;
while(chunk_len < h->chunk_len && h->cur_len) {
LogEntry * e = LogUpdate(h->log,
h->l,
h->queue[h->cur_off].p,
h->queue[h->cur_off].op,
h->queue[h->cur_off].arg,
h->queue[h->cur_off].arg_size);
assert(e->xid != INVALID_XID);
chunk_len += sizeofLogEntry(e);
Page * p = h->queue[h->cur_off].p;
h->cur_len--;
h->cur_off = (h->cur_off+1)%h->max_len;
writelock(p->rwlatch,0);
stasis_page_lsn_write(e->xid, p, e->LSN);
unlock(p->rwlatch);
releasePage(p);
}
if(chunk_len > 0) {
lsn_t to_force = h->l->prevLSN;
pthread_mutex_unlock(&h->mut);
LogForce(h->log, to_force, LOG_FORCE_COMMIT);
pthread_mutex_lock(&h->mut);
}
}
pthread_cond_signal(&h->done);
if(!h->closed) { // XXX hack!
pthread_cond_wait(&h->ready, &h->mut);
}
}
pthread_mutex_unlock(&h->mut);
return 0;
}
void stasis_log_reordering_handle_flush(stasis_log_reordering_handle_t * h) {
pthread_mutex_lock(&h->mut);
while(h->cur_len > 0) {
pthread_cond_wait(&h->done, &h->mut);
}
pthread_mutex_unlock(&h->mut);
}
void stasis_log_reordering_handle_close(stasis_log_reordering_handle_t * h) {
h->closed = 1;
pthread_cond_signal(&h->ready);
pthread_join(h->worker,0);
assert(h->cur_len == 0);
pthread_mutex_destroy(&h->mut);
pthread_cond_destroy(&h->ready);
pthread_cond_destroy(&h->done);
free(h->queue);
free(h);
}
stasis_log_reordering_handle_t *
stasis_log_reordering_handle_open(TransactionLog * l,
stasis_log_t* log,
size_t chunk_len,
size_t max_len) {
stasis_log_reordering_handle_t * ret = malloc(sizeof(*ret));
ret->l = l;
ret->log = log;
pthread_mutex_init(&ret->mut,0);
pthread_cond_init(&ret->done,0);
pthread_cond_init(&ret->ready,0);
ret->closed = 0;
ret->queue = malloc(sizeof(stasis_log_reordering_op_t)*max_len);
ret->chunk_len = chunk_len;
ret->max_len = max_len;
ret->cur_off = 0;
ret->cur_len = 0;
pthread_create(&ret->worker,0,stasis_log_reordering_handle_worker,ret);
return ret;
}
void stasis_log_reordering_handle_append(stasis_log_reordering_handle_t * h,
Page * p,
unsigned int op,
const byte * arg,
size_t arg_size
) {
while(h->cur_len == h->max_len) {
pthread_cond_wait(&h->done, &h->mut);
}
intptr_t idx = (h->cur_off+h->cur_len)%h->max_len;
h->queue[idx].p = p;
h->queue[idx].op = op;
h->queue[idx].arg = malloc(arg_size);
memcpy(h->queue[idx].arg,arg,arg_size);
h->queue[idx].arg_size = arg_size;
h->cur_len++;
pthread_cond_signal(&h->ready);
}

View file

@ -1,6 +1,6 @@
#include <stasis/operations.h>
#include <stasis/page.h>
#include <stasis/logger/reorderingHandle.h>
#include <string.h>
static int op_lsn_free_set(const LogEntry *e, Page *p) {
if(*stasis_page_type_ptr(p) != SLOTTED_LSN_FREE_PAGE) { abort() ; }
@ -24,7 +24,8 @@ static int op_lsn_free_unset(const LogEntry *e, Page *p) {
memcpy(p->memAddr + a[0], b+a[1], a[1]);
return 0;
}
int TsetLSNFree(int xid, recordid rid, const void * dat) {
int TsetLsnFreeReorderable(int xid, stasis_log_reordering_handle_t * h,
recordid rid, const void * dat) {
Page * p = loadPage(xid, rid.page);
readlock(p->rwlatch,0);
rid = stasis_record_dereference(xid,p,rid);
@ -34,6 +35,7 @@ int TsetLSNFree(int xid, recordid rid, const void * dat) {
fflush(stderr);
abort();
unlock(p->rwlatch);
return 1;
} else {
rid.size = stasis_record_type_to_size(rid.size);
intptr_t sz = 2 * (sizeof(pageoff_t) + rid.size);
@ -51,12 +53,18 @@ int TsetLSNFree(int xid, recordid rid, const void * dat) {
stasis_record_read(xid, p, rid, b+rid.size);
unlock(p->rwlatch);
if(!h) {
Tupdate(xid,rid.page,buf,sz,OPERATION_SET_LSN_FREE);
} else {
TreorderableUpdate(xid,h,rid.page,buf,sz,OPERATION_SET_LSN_FREE);
}
free(buf);
}
return 0;
}
int TsetLsnFree(int xid, recordid rid, const void * dat) {
return TsetLsnFreeReorderable(xid, 0, rid, dat);
}
Operation getSetLsnFree() {
Operation o = {

View file

@ -28,7 +28,7 @@
#include <assert.h>
#include <limits.h>
static TransactionLog XactionTable[MAX_TRANSACTIONS];
TransactionLog XactionTable[MAX_TRANSACTIONS];
static int numActiveXactions = 0;
static int xidCount = 0;
@ -312,6 +312,35 @@ static compensated_function void TactionHelper(int xid,
unlock(p->rwlatch);
}
void TreorderableUpdate(int xid, void * hp, pageid_t page,
const void *dat, size_t datlen, int op) {
stasis_log_reordering_handle_t * h = (typeof(h))hp;
assert(xid >= 0 && XactionTable[xid % MAX_TRANSACTIONS].xid == xid);
Page * p = loadPage(xid, page);
try {
if(globalLockManager.writeLockPage) {
globalLockManager.writeLockPage(xid, p->id);
}
} end;
pthread_mutex_lock(&h->mut);
// e = LogUpdate(stasis_log_file, &XactionTable[xid % MAX_TRANSACTIONS],
// p, op, dat, datlen);
stasis_log_reordering_handle_append(h, p, op, dat, datlen);
LogEntry * e = allocUpdateLogEntry(-1, h->l->xid, op,
p ? p->id : INVALID_PAGE,
dat, datlen);
e->LSN = 0;
writelock(p->rwlatch,0);
doUpdate(e, p);
unlock(p->rwlatch);
pthread_mutex_unlock(&h->mut);
releasePage(p);
freeLogEntry(e);
}
compensated_function void TupdateStr(int xid, pageid_t page,
const char *dat, size_t datlen, int op) {
Tupdate(xid, page, dat, datlen, op);

View file

@ -55,6 +55,20 @@ terms specified in this license.
#ifndef __LOGGER2_H__
#define __LOGGER2_H__
#include <stasis/common.h>
/**
Contains the state needed by the logging layer to perform
operations on a transaction.
*/
typedef struct TransactionLog {
int xid;
lsn_t prevLSN;
lsn_t recLSN;
} TransactionLog;
typedef struct stasis_log_t stasis_log_t;
#include <stasis/operations.h>
/**
@ -64,21 +78,16 @@ terms specified in this license.
*/
typedef int (guard_fcn_t)(const LogEntry *, void *);
typedef struct stasis_log_t stasis_log_t;
typedef enum {
LOG_FORCE_COMMIT, LOG_FORCE_WAL
} stasis_log_force_mode_t;
/**
Contains the state needed by the logging layer to perform
operations on a transaction.
XXX TransactionTable should be private to transactional2.c!
*/
typedef struct {
int xid;
lsn_t prevLSN;
lsn_t recLSN;
} TransactionLog;
extern TransactionLog XactionTable[MAX_TRANSACTIONS];
/**
This is the log implementation that is being used.

View file

@ -0,0 +1,43 @@
#ifndef __STASIS_LOG_REORDERING_HANDLE_H
#define __STASIS_LOG_REORDERING_HANDLE_H
#include <stasis/common.h>
#include <stasis/logger/logger2.h>
typedef struct {
Page * p;
unsigned int op;
byte * arg;
size_t arg_size;
} stasis_log_reordering_op_t;
typedef struct stasis_log_reordering_handle_t {
TransactionLog *l;
stasis_log_t * log;
pthread_mutex_t mut;
pthread_cond_t done;
pthread_cond_t ready;
int closed;
pthread_t worker;
stasis_log_reordering_op_t * queue;
size_t chunk_len;
size_t max_len;
size_t cur_off;
size_t cur_len;
} stasis_log_reordering_handle_t;
#include <stasis/page.h>
void stasis_log_reordering_handle_flush(stasis_log_reordering_handle_t * h);
void stasis_log_reordering_handle_close(stasis_log_reordering_handle_t * h);
stasis_log_reordering_handle_t *
stasis_log_reordering_handle_open(TransactionLog * l,
stasis_log_t* log,
size_t chunk_len,
size_t max_len);
void stasis_log_reordering_handle_append(stasis_log_reordering_handle_t * h,
Page * p,
unsigned int op,
const byte * arg,
size_t arg_size);
#endif //__STASIS_LOG_REORDERING_HANDLE_H

View file

@ -1,6 +1,9 @@
#ifndef __LSN_FREE_SET_H
#define __LSN_FREE_SET_H
#include <stasis/logger/reorderingHandle.h>
Operation getSetLsnFree();
Operation getSetLsnFreeInverse();
int TsetLSNFree(int xid, recordid rid, const void *dat);
int TsetLsnFree(int xid, recordid rid, const void *dat);
int TsetLsnReorderable(int xid, stasis_log_reordering_handle_t * h,
recordid rid, const void *dat);
#endif //__LSN_FREE_SET_H

View file

@ -541,7 +541,6 @@ terms specified in this license.
#include "common.h"
#include "flags.h"
BEGIN_C_DECLS
//XXX doesn't belong here.
@ -615,6 +614,9 @@ compensated_function void Tupdate(int xid, pageid_t page,
*/
compensated_function void TupdateStr(int xid, pageid_t page,
const char *dat, size_t datlen, int op);
void TreorderableUpdate(int xid, void * h, pageid_t page,
const void * dat, size_t datlen, int op);
/**
* Read the value of a record.
*

View file

@ -50,6 +50,7 @@ terms specified in this license.
#include <stasis/page.h>
#define LOG_NAME "check_operations.log"
#include <stdio.h>
/**
@ -586,7 +587,7 @@ START_TEST(operation_lsn_free) {
int foo;
Tread(xid[i%2],rid[i], &foo);
assert(foo == 42);
TsetLSNFree(xid[i%2], rid[i], &i);
TsetLsnFree(xid[i%2], rid[i], &i);
Tread(xid[i%2],rid[i], &foo);
assert(foo == i);
}
@ -614,6 +615,85 @@ START_TEST(operation_lsn_free) {
} END_TEST
START_TEST(operation_reorderable) {
Tinit();
recordid rid[100];
{
int xid = Tbegin();
pageid_t pid = TpageAlloc(xid);
Page * p = loadPage(xid,pid);
writelock(p->rwlatch,0);
stasis_slotted_lsn_free_initialize_page(p);
// XXX hack!
byte * old = malloc(PAGE_SIZE);
memcpy(old, p->memAddr, PAGE_SIZE);
int fortyTwo = 42;
for(int i = 0; i < 100; i++) {
rid[i] = stasis_record_alloc_begin(xid, p, sizeof(int));
stasis_record_alloc_done(xid, p, rid[i]);
stasis_record_write(xid, p, -1, rid[i], (const byte*)&fortyTwo);
}
byte * new = malloc(PAGE_SIZE);
memcpy(new, p->memAddr, PAGE_SIZE);
memcpy(p->memAddr, old, PAGE_SIZE);
unlock(p->rwlatch);
releasePage(p);
TpageSet(xid, pid, new);
free(old);
free(new);
Tcommit(xid);
}
{
int xid[2];
xid[0] = Tbegin();
xid[1] = Tbegin();
stasis_log_reordering_handle_t * rh
= stasis_log_reordering_handle_open(
&XactionTable[xid[0]% MAX_TRANSACTIONS],
stasis_log_file,
100, // bytes (far too low!)
5 // log entries
);
for(int i = 0; i < 100; i++) {
int foo;
Tread(xid[i%2],rid[i], &foo);
assert(foo == 42);
if(i%2) {
TsetLsnFree(xid[i%2], rid[i], &i);
} else {
TsetLsnFreeReorderable(xid[i%2], rh, rid[i], &i);
}
Tread(xid[i%2],rid[i], &foo);
assert(foo == i);
}
stasis_log_reordering_handle_close(rh);
Tcommit(xid[0]);
Tabort(xid[1]);
}
Tdeinit();
Tinit();
{
int xid = Tbegin();
for(int i = 0; i < 100; i++) {
int foo;
Tread(xid, rid[i], &foo);
if(i%2) {
assert(foo == 42);
} else {
assert(foo == i);
}
}
Tcommit(xid);
}
Tdeinit();
} END_TEST
/**
Add suite declarations here
*/
@ -634,6 +714,7 @@ Suite * check_suite(void) {
tcase_add_test(tc, operation_alloc_test);
tcase_add_test(tc, operation_array_list);
tcase_add_test(tc, operation_lsn_free);
tcase_add_test(tc, operation_reorderable);
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);
suite_add_tcase(s, tc);