more benchmarks; transactions can now be multithreaded
This commit is contained in:
parent
d413bb27f3
commit
83ad15b6d0
14 changed files with 455 additions and 294 deletions
|
@ -26,6 +26,7 @@ CREATE_EXECUTABLE(zeroCopy)
|
||||||
CREATE_EXECUTABLE(sequentialThroughput)
|
CREATE_EXECUTABLE(sequentialThroughput)
|
||||||
CREATE_EXECUTABLE(qos)
|
CREATE_EXECUTABLE(qos)
|
||||||
CREATE_EXECUTABLE(writeBack)
|
CREATE_EXECUTABLE(writeBack)
|
||||||
|
CREATE_EXECUTABLE(distributedLsnFree)
|
||||||
IF(CHECK_LIBRARY)
|
IF(CHECK_LIBRARY)
|
||||||
ADD_TEST(rose rose)
|
ADD_TEST(rose rose)
|
||||||
ENDIF(CHECK_LIBRARY)
|
ENDIF(CHECK_LIBRARY)
|
||||||
|
|
|
@ -10,7 +10,8 @@ noinst_PROGRAMS=lhtableThreaded naiveHash \
|
||||||
naiveMultiThreaded rawSet arrayListSet \
|
naiveMultiThreaded rawSet arrayListSet \
|
||||||
linearHashNTA linkedListNTA pageOrientedListNTA \
|
linearHashNTA linkedListNTA pageOrientedListNTA \
|
||||||
linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests \
|
linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests \
|
||||||
transitiveClosure zeroCopy sequentialThroughput rose roseTable qos writeBack
|
transitiveClosure zeroCopy sequentialThroughput rose roseTable qos writeBack \
|
||||||
|
distributedLsnFree
|
||||||
endif
|
endif
|
||||||
AM_CFLAGS=${GLOBAL_CFLAGS}
|
AM_CFLAGS=${GLOBAL_CFLAGS}
|
||||||
AM_CXXFLAGS=${GLOBAL_CXXFLAGS} -I ${top_builddir}/src
|
AM_CXXFLAGS=${GLOBAL_CXXFLAGS} -I ${top_builddir}/src
|
||||||
|
|
134
benchmarks/distributedLsnFree.c
Normal file
134
benchmarks/distributedLsnFree.c
Normal file
|
@ -0,0 +1,134 @@
|
||||||
|
#include "lsn_bench_common.h"
|
||||||
|
|
||||||
|
int main(int argc, char ** argv) {
|
||||||
|
unlink("storefile.txt");
|
||||||
|
unlink("logfile.txt");
|
||||||
|
char * mode = argv[1];
|
||||||
|
long long num_rids = atoll(argv[2]);
|
||||||
|
long long num_xacts = atoll(argv[3]);
|
||||||
|
long long writes_per_xact = atoll(argv[4]);
|
||||||
|
recordid * rids;
|
||||||
|
recordid * fast;
|
||||||
|
cached_addr * cache;
|
||||||
|
int writeback = !(strcmp(mode, "writeback")&&strcmp(mode,"writeback-net")&&strcmp(mode,"writeback-pipeline"));
|
||||||
|
// stasis_truncation_automatic = 0;
|
||||||
|
|
||||||
|
/*if(!(strcmp(mode, "writeback-pipeline"))) {
|
||||||
|
// pipelining likes big queues
|
||||||
|
// stasis_log_write_buffer_size = 50 * 1024 * 1024;
|
||||||
|
} else {
|
||||||
|
}*/
|
||||||
|
stasis_log_write_buffer_size = 50 * 1024 * 1024;
|
||||||
|
Tinit();
|
||||||
|
|
||||||
|
alloc_rids(num_rids,&rids,&fast);
|
||||||
|
int net = 0;
|
||||||
|
|
||||||
|
if(writeback) {
|
||||||
|
build_cache(rids,&cache,num_rids);
|
||||||
|
}
|
||||||
|
// XXX move above build_cache!
|
||||||
|
if(!(strcmp(mode, "normal-net")&&strcmp(mode,"writeback-net"))) {
|
||||||
|
net = 1;
|
||||||
|
emulate_remote_pages();
|
||||||
|
emulate_remote_log();
|
||||||
|
}
|
||||||
|
int num_workers = 100;
|
||||||
|
if(!strcmp(mode, "writeback-pipeline")) {
|
||||||
|
emulate_remote_pages();
|
||||||
|
net = 1;
|
||||||
|
num_workers = 10;
|
||||||
|
stasis_log_reordering_usleep_after_flush = net_latency * 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
// stasis_log_reordering_handle_t* handles[num_workers];
|
||||||
|
|
||||||
|
|
||||||
|
lsn_t last_lsn = 0;
|
||||||
|
for(long long x = 0; x < num_xacts; x++) {
|
||||||
|
int xid = Tbegin();
|
||||||
|
if(net && writeback) {
|
||||||
|
writeback_unit_of_work_arg a[num_workers];
|
||||||
|
pthread_t workers[num_workers];
|
||||||
|
for(int i =0 ; i < num_workers; i++) {
|
||||||
|
a[i].num_rids = num_rids;
|
||||||
|
a[i].rid_per_xact = writes_per_xact;
|
||||||
|
a[i].cache = cache;
|
||||||
|
a[i].done = 0;
|
||||||
|
a[i].xid = xid;
|
||||||
|
a[i].workerid = i;
|
||||||
|
a[i].iterationid = x;
|
||||||
|
a[i].divisor = num_workers;
|
||||||
|
pthread_mutex_init(&a[i].mut,0);
|
||||||
|
|
||||||
|
pthread_create(&workers[i], 0, writeback_unit_of_work, &a[i]);
|
||||||
|
}
|
||||||
|
for(int i =0; i < num_workers; i++) {
|
||||||
|
pthread_mutex_lock(&a[i].mut);
|
||||||
|
a[i].done = 1;
|
||||||
|
pthread_mutex_unlock(&a[i].mut);
|
||||||
|
}
|
||||||
|
for(int i =0 ; i < num_workers; i++) {
|
||||||
|
pthread_join(workers[i],0);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
/* if(writeback && net) {
|
||||||
|
for(int i = 0; i < num_workers; i++) {
|
||||||
|
handles[i] = stasis_log_reordering_handle_open(
|
||||||
|
&XactionTable[xid%MAX_TRANSACTIONS],
|
||||||
|
stasis_log_file,
|
||||||
|
(0.9*stasis_log_write_buffer_size)/num_workers,
|
||||||
|
//512*1024/ua->divisor, // 0.5 mb in log tail at once
|
||||||
|
1000000/num_workers, // max num outstanding requests
|
||||||
|
(50 * 1024 * 1024)/num_workers // max backlog in bytes
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} */
|
||||||
|
|
||||||
|
for(long long j = 0; j < writes_per_xact; j++) {
|
||||||
|
// long long idx = ((x*writes_per_xact)+j)%num_rids;
|
||||||
|
long long idx = j % num_rids;
|
||||||
|
// if(!(j % 100)) { printf("."); fflush(stdout); }
|
||||||
|
if(!(strcmp(mode, "normal")&&strcmp(mode, "normal-net"))) {
|
||||||
|
TsetLsnFree(xid, rids[idx], &j);
|
||||||
|
} else {
|
||||||
|
assert(writeback);
|
||||||
|
int old = cache[idx].val;
|
||||||
|
cache[idx].val = j;
|
||||||
|
if(net) {
|
||||||
|
/* TsetReorderableWriteBack(xid, handles[j%num_workers], cache[idx].pid,
|
||||||
|
cache[idx].off, cache[idx].len,&j,&old);
|
||||||
|
*/
|
||||||
|
} else {
|
||||||
|
last_lsn = TsetWriteBack(xid, cache[idx].pid,cache[idx].off,
|
||||||
|
cache[idx].len,&j,&old);
|
||||||
|
assert(last_lsn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* if(writeback && net) {
|
||||||
|
for(int j = 0; j < num_workers; j++) {
|
||||||
|
stasis_log_reordering_handle_close(handles[j]);
|
||||||
|
}
|
||||||
|
} */
|
||||||
|
}
|
||||||
|
if(net) {
|
||||||
|
last_lsn = XactionTable[xid%MAX_TRANSACTIONS].prevLSN;
|
||||||
|
}
|
||||||
|
Tcommit(xid);
|
||||||
|
}
|
||||||
|
// XXX hack; really, want to register upcall in buffermanager...
|
||||||
|
if(writeback) {
|
||||||
|
printf("starting writeback"); fflush(stdout);
|
||||||
|
assert(last_lsn);
|
||||||
|
for(long long i = 0; i < num_rids; i++) {
|
||||||
|
Page *p = loadPage(-1, rids[i].page);
|
||||||
|
writelock(p->rwlatch,0);
|
||||||
|
stasis_record_write(-1, p, last_lsn, rids[i], (byte*)&cache[i].val);
|
||||||
|
unlock(p->rwlatch);
|
||||||
|
releasePage(p);
|
||||||
|
releasePage(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Tdeinit();
|
||||||
|
}
|
265
benchmarks/lsn_bench_common.h
Normal file
265
benchmarks/lsn_bench_common.h
Normal file
|
@ -0,0 +1,265 @@
|
||||||
|
#include <stasis/transactional.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
void alloc_rids(long long num_rids, recordid ** slow, recordid ** fast) {
|
||||||
|
*slow = malloc(num_rids * sizeof(**slow));
|
||||||
|
*fast = malloc((num_rids / 10) * sizeof(**fast));
|
||||||
|
|
||||||
|
int xid = Tbegin();
|
||||||
|
|
||||||
|
byte * old = malloc(PAGE_SIZE);
|
||||||
|
byte * new = malloc(PAGE_SIZE);
|
||||||
|
|
||||||
|
for(long long i = 0; i < num_rids; ) {
|
||||||
|
pageid_t pid = TpageAlloc(xid);
|
||||||
|
Page * p = loadPage(xid, pid);
|
||||||
|
writelock(p->rwlatch,0);
|
||||||
|
memcpy(old, p->memAddr, PAGE_SIZE);
|
||||||
|
stasis_slotted_lsn_free_initialize_page(p);
|
||||||
|
while(i < num_rids &&
|
||||||
|
(
|
||||||
|
((*slow)[i] = stasis_record_alloc_begin(xid, p, sizeof(int))).size
|
||||||
|
== sizeof(int)
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
stasis_record_alloc_done(xid, p, (*slow)[i]);
|
||||||
|
if(!(i%10)) {
|
||||||
|
(*fast)[i/10] = stasis_record_alloc_begin(xid, p, sizeof(int));
|
||||||
|
if((*fast)[i/10].size!=sizeof(int)) {
|
||||||
|
break; // don't increment i
|
||||||
|
}
|
||||||
|
stasis_record_alloc_done(xid, p, (*fast)[i/10]);
|
||||||
|
}
|
||||||
|
assert((*slow)[i].size != -1);
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
memcpy(new, p->memAddr, PAGE_SIZE);
|
||||||
|
memcpy(p->memAddr, old, PAGE_SIZE);
|
||||||
|
unlock(p->rwlatch);
|
||||||
|
releasePage(p);
|
||||||
|
TpageSet(xid, pid, new);
|
||||||
|
}
|
||||||
|
free(old);
|
||||||
|
free(new);
|
||||||
|
|
||||||
|
Tcommit(xid);
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
pageid_t pid;
|
||||||
|
pageoff_t off;
|
||||||
|
pageoff_t len;
|
||||||
|
int val;
|
||||||
|
} cached_addr;
|
||||||
|
|
||||||
|
void build_cache(recordid * rids, cached_addr** cache, long long count) {
|
||||||
|
*cache = malloc (sizeof(**cache) * count);
|
||||||
|
lsn_t log_trunc = stasis_log_file->truncation_point(stasis_log_file);
|
||||||
|
for(long long i = 0; i < count; i++) {
|
||||||
|
(*cache)[i].pid = rids[i].page;
|
||||||
|
|
||||||
|
Page * p = loadPage(-1, (*cache)[i].pid);
|
||||||
|
readlock(p->rwlatch,0);
|
||||||
|
byte * b = stasis_record_write_begin(-1, p, rids[i]);
|
||||||
|
(*cache)[i].off = b - p->memAddr;
|
||||||
|
stasis_record_write_done(-1, p, rids[i], b);
|
||||||
|
stasis_page_lsn_write(-1, p, log_trunc);
|
||||||
|
(*cache)[i].len = stasis_record_type_to_size(rids[i].size);
|
||||||
|
(*cache)[i].val = 0;
|
||||||
|
unlock(p->rwlatch);
|
||||||
|
// releasePage(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
static int net_latency = 2;
|
||||||
|
static byte * (*origWrite)(int xid, Page *p, recordid rid);
|
||||||
|
byte * slowWrite(int xid, Page *p, recordid rid) {
|
||||||
|
usleep(net_latency * 1000);
|
||||||
|
return origWrite(xid,p,rid);
|
||||||
|
}
|
||||||
|
|
||||||
|
static const byte * (*origRead)(int xid, Page *p, recordid rid);
|
||||||
|
const byte * slowRead(int xid, Page *p, recordid rid) {
|
||||||
|
usleep(net_latency * 1000);
|
||||||
|
return origRead(xid,p,rid);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int (*original_write_entry) (struct stasis_log_t* log, LogEntry * e);
|
||||||
|
int my_write_entry(struct stasis_log_t* log, LogEntry *e) {
|
||||||
|
usleep(net_latency * 1000);
|
||||||
|
return original_write_entry(log,e);
|
||||||
|
}
|
||||||
|
|
||||||
|
void emulate_remote_log() {
|
||||||
|
original_write_entry = stasis_log_file->write_entry;
|
||||||
|
stasis_log_file->write_entry = my_write_entry;
|
||||||
|
}
|
||||||
|
void emulate_remote_pages() {
|
||||||
|
origWrite = stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordWrite;
|
||||||
|
origRead = stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordRead;
|
||||||
|
// xxx a bit of cheating; don't pay latency for lsn write
|
||||||
|
// (could amortize w/ recordWrite)
|
||||||
|
stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordWrite = slowWrite;
|
||||||
|
stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordRead = slowRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
/////////// Background workers for parallelizing enqueues to slow logs
|
||||||
|
typedef struct {
|
||||||
|
long long num_rids;
|
||||||
|
long long rid_per_xact;
|
||||||
|
recordid * rids;
|
||||||
|
int done;
|
||||||
|
pthread_mutex_t mut;
|
||||||
|
} bulk_worker_args;
|
||||||
|
|
||||||
|
void* normal_worker(void * ap) {
|
||||||
|
bulk_worker_args * a = ap;
|
||||||
|
pthread_mutex_lock(&a->mut);
|
||||||
|
for(int i = 0; !a->done; i++) {
|
||||||
|
pthread_mutex_unlock(&a->mut);
|
||||||
|
|
||||||
|
struct timeval tv;
|
||||||
|
gettimeofday(&tv, 0);
|
||||||
|
long long start = tv.tv_usec + tv.tv_sec * 1000000;
|
||||||
|
|
||||||
|
int xid = Tbegin();
|
||||||
|
for(int j = 0; j < a->rid_per_xact; j++) {
|
||||||
|
int val = i * a->rid_per_xact + j;
|
||||||
|
Tset(xid, a->rids[j%a->num_rids], &val);
|
||||||
|
}
|
||||||
|
Tcommit(xid);
|
||||||
|
|
||||||
|
gettimeofday(&tv, 0);
|
||||||
|
long long stop = tv.tv_usec + tv.tv_sec * 1000000;
|
||||||
|
printf("low(ms),%lld\n", stop-start);
|
||||||
|
fflush(stdout);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&a->mut);
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&a->mut);
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
}
|
||||||
|
typedef struct {
|
||||||
|
long long num_rids;
|
||||||
|
long long rid_per_xact;
|
||||||
|
cached_addr * cache;
|
||||||
|
int done;
|
||||||
|
int xid;
|
||||||
|
int workerid;
|
||||||
|
int iterationid;
|
||||||
|
int divisor;
|
||||||
|
pthread_mutex_t mut;
|
||||||
|
} writeback_unit_of_work_arg;
|
||||||
|
void * writeback_unit_of_work(void * ap) {
|
||||||
|
writeback_unit_of_work_arg * ua = ap;
|
||||||
|
|
||||||
|
stasis_log_reordering_handle_t * rh
|
||||||
|
= stasis_log_reordering_handle_open(
|
||||||
|
&XactionTable[ua->xid%MAX_TRANSACTIONS],
|
||||||
|
stasis_log_file,
|
||||||
|
(0.9*stasis_log_write_buffer_size)/ua->divisor,
|
||||||
|
//512*1024/ua->divisor, // 0.5 mb in log tail at once
|
||||||
|
1000000/ua->divisor, // max num outstanding requests
|
||||||
|
(50 * 1024 * 1024)/ua->divisor // max backlog in bytes
|
||||||
|
);
|
||||||
|
/*
|
||||||
|
stasis_log_reordering_handle_open(&XactionTable[ua->xid%MAX_TRANSACTIONS],
|
||||||
|
stasis_log_file,
|
||||||
|
(stasis_log_write_buffer_size * 0.25)/ua->divisor,
|
||||||
|
//512*1024/ua->divisor, // 0.5 mb in log tail at once
|
||||||
|
1000000/ua->divisor, // max num outstanding requests
|
||||||
|
(50 * 1024 * 1024)/ua->divisor // max backlog in bytes
|
||||||
|
);*/
|
||||||
|
for(int j = 0; j < ua->rid_per_xact/ua->divisor; j++) {
|
||||||
|
long long idx = (ua->workerid+(ua->divisor*j)) % ua->num_rids;
|
||||||
|
int old = ua->cache[idx].val;
|
||||||
|
ua->cache[idx].val = (ua->workerid+(ua->divisor*j));
|
||||||
|
TsetReorderableWriteBack(ua->xid, rh, ua->cache[idx].pid,
|
||||||
|
ua->cache[idx].off, ua->cache[idx].len,&ua->cache[idx].val,&old);
|
||||||
|
// TsetReorderable(ua->xid, rh, a->rids[(j*ua->divisor+ua->n)%a->num_rids], &val);
|
||||||
|
|
||||||
|
}
|
||||||
|
stasis_log_reordering_handle_close(rh);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int xid;
|
||||||
|
int n; // which worker am i?
|
||||||
|
int i; // which iteration is this?
|
||||||
|
int divisor;
|
||||||
|
bulk_worker_args * a;
|
||||||
|
} unit_of_work_arg;
|
||||||
|
void * bg_unit_of_work(void * ap) {
|
||||||
|
unit_of_work_arg * ua = ap;
|
||||||
|
bulk_worker_args * a = ua->a;
|
||||||
|
|
||||||
|
stasis_log_reordering_handle_t * rh
|
||||||
|
= stasis_log_reordering_handle_open(&XactionTable[ua->xid%MAX_TRANSACTIONS],
|
||||||
|
stasis_log_file,
|
||||||
|
(stasis_log_write_buffer_size * 0.25)/ua->divisor,
|
||||||
|
//512*1024/ua->divisor, // 0.5 mb in log tail at once
|
||||||
|
1000000/ua->divisor, // max num outstanding requests
|
||||||
|
(50 * 1024 * 1024)/ua->divisor // max backlog in bytes
|
||||||
|
);
|
||||||
|
for(int j = 0; j < a->rid_per_xact/ua->divisor; j++) {
|
||||||
|
int val = ua->i * (a->rid_per_xact/ua->divisor) + j;
|
||||||
|
TsetReorderable(ua->xid, rh, a->rids[(j*ua->divisor+ua->n)%a->num_rids], &val);
|
||||||
|
}
|
||||||
|
stasis_log_reordering_handle_close(rh);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* bg_worker(void * ap) {
|
||||||
|
bulk_worker_args * a = ap;
|
||||||
|
pthread_mutex_lock(&a->mut);
|
||||||
|
for(int i = 0; !a->done; i++) {
|
||||||
|
pthread_mutex_unlock(&a->mut);
|
||||||
|
|
||||||
|
struct timeval tv;
|
||||||
|
gettimeofday(&tv, 0);
|
||||||
|
long long start = tv.tv_usec + tv.tv_sec * 1000000;
|
||||||
|
|
||||||
|
int xid = Tbegin();
|
||||||
|
if(stasis_log_file->write_entry == my_write_entry) {
|
||||||
|
// based on tweaking; also, normal-net is ~ 100x slower than nromal
|
||||||
|
int num_worker = 100;
|
||||||
|
pthread_t workers[num_worker];
|
||||||
|
unit_of_work_arg args[num_worker];
|
||||||
|
for(int w = 0; w < num_worker; w++) {
|
||||||
|
args[i].xid = xid;
|
||||||
|
args[i].n = w;
|
||||||
|
args[i].i = i;
|
||||||
|
args[i].divisor = num_worker;
|
||||||
|
args[i].a = a;
|
||||||
|
pthread_create(&workers[w], 0, bg_unit_of_work, &(args[i]));
|
||||||
|
}
|
||||||
|
for(int w = 0; w < num_worker; w++) {
|
||||||
|
pthread_join(workers[w], 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
unit_of_work_arg unit_arg = {
|
||||||
|
xid,
|
||||||
|
0,
|
||||||
|
i,
|
||||||
|
1,
|
||||||
|
ap
|
||||||
|
};
|
||||||
|
bg_unit_of_work(&unit_arg);
|
||||||
|
}
|
||||||
|
Tcommit(xid);
|
||||||
|
|
||||||
|
gettimeofday(&tv, 0);
|
||||||
|
long long stop = tv.tv_usec + tv.tv_sec * 1000000;
|
||||||
|
printf("low(ms),%lld\n", stop-start);
|
||||||
|
fflush(stdout);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&a->mut);
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&a->mut);
|
||||||
|
return 0;
|
||||||
|
}
|
183
benchmarks/qos.c
183
benchmarks/qos.c
|
@ -1,175 +1,4 @@
|
||||||
#include <stasis/transactional.h>
|
#include "lsn_bench_common.h"
|
||||||
#include <string.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
|
|
||||||
void alloc_rids(long long num_rids, recordid ** slow, recordid ** fast) {
|
|
||||||
*slow = malloc(num_rids * sizeof(**slow));
|
|
||||||
*fast = malloc((num_rids / 10) * sizeof(**fast));
|
|
||||||
|
|
||||||
int xid = Tbegin();
|
|
||||||
|
|
||||||
byte * old = malloc(PAGE_SIZE);
|
|
||||||
byte * new = malloc(PAGE_SIZE);
|
|
||||||
|
|
||||||
for(long long i = 0; i < num_rids; ) {
|
|
||||||
pageid_t pid = TpageAlloc(xid);
|
|
||||||
Page * p = loadPage(xid, pid);
|
|
||||||
writelock(p->rwlatch,0);
|
|
||||||
memcpy(old, p->memAddr, PAGE_SIZE);
|
|
||||||
stasis_slotted_lsn_free_initialize_page(p);
|
|
||||||
while(i < num_rids &&
|
|
||||||
(
|
|
||||||
((*slow)[i] = stasis_record_alloc_begin(xid, p, sizeof(int))).size
|
|
||||||
== sizeof(int)
|
|
||||||
)
|
|
||||||
) {
|
|
||||||
stasis_record_alloc_done(xid, p, (*slow)[i]);
|
|
||||||
if(!(i%10)) {
|
|
||||||
(*fast)[i/10] = stasis_record_alloc_begin(xid, p, sizeof(int));
|
|
||||||
if((*fast)[i/10].size!=sizeof(int)) {
|
|
||||||
break; // don't increment i
|
|
||||||
}
|
|
||||||
stasis_record_alloc_done(xid, p, (*fast)[i/10]);
|
|
||||||
}
|
|
||||||
assert((*slow)[i].size != -1);
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
memcpy(new, p->memAddr, PAGE_SIZE);
|
|
||||||
memcpy(p->memAddr, old, PAGE_SIZE);
|
|
||||||
unlock(p->rwlatch);
|
|
||||||
releasePage(p);
|
|
||||||
TpageSet(xid, pid, new);
|
|
||||||
}
|
|
||||||
free(old);
|
|
||||||
free(new);
|
|
||||||
|
|
||||||
Tcommit(xid);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
long long num_rids;
|
|
||||||
long long rid_per_xact;
|
|
||||||
recordid * rids;
|
|
||||||
int done;
|
|
||||||
pthread_mutex_t mut;
|
|
||||||
} bulk_worker_args;
|
|
||||||
|
|
||||||
static int (*original_write_entry) (struct stasis_log_t* log, LogEntry * e);
|
|
||||||
|
|
||||||
static int net_latency = 2;
|
|
||||||
|
|
||||||
|
|
||||||
int my_write_entry(struct stasis_log_t* log, LogEntry *e) {
|
|
||||||
usleep(net_latency * 1000);
|
|
||||||
return original_write_entry(log,e);
|
|
||||||
}
|
|
||||||
|
|
||||||
void* normal_worker(void * ap) {
|
|
||||||
bulk_worker_args * a = ap;
|
|
||||||
pthread_mutex_lock(&a->mut);
|
|
||||||
for(int i = 0; !a->done; i++) {
|
|
||||||
pthread_mutex_unlock(&a->mut);
|
|
||||||
|
|
||||||
struct timeval tv;
|
|
||||||
gettimeofday(&tv, 0);
|
|
||||||
long long start = tv.tv_usec + tv.tv_sec * 1000000;
|
|
||||||
|
|
||||||
int xid = Tbegin();
|
|
||||||
for(int j = 0; j < a->rid_per_xact; j++) {
|
|
||||||
int val = i * a->rid_per_xact + j;
|
|
||||||
Tset(xid, a->rids[j%a->num_rids], &val);
|
|
||||||
}
|
|
||||||
Tcommit(xid);
|
|
||||||
|
|
||||||
gettimeofday(&tv, 0);
|
|
||||||
long long stop = tv.tv_usec + tv.tv_sec * 1000000;
|
|
||||||
printf("low(ms),%lld\n", stop-start);
|
|
||||||
fflush(stdout);
|
|
||||||
|
|
||||||
pthread_mutex_lock(&a->mut);
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&a->mut);
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
}
|
|
||||||
typedef struct {
|
|
||||||
int xid;
|
|
||||||
int n; // which worker am i?
|
|
||||||
int i; // which iteration is this?
|
|
||||||
int divisor;
|
|
||||||
bulk_worker_args * a;
|
|
||||||
} unit_of_work_arg;
|
|
||||||
void * bg_unit_of_work(void * ap) {
|
|
||||||
unit_of_work_arg * ua = ap;
|
|
||||||
bulk_worker_args * a = ua->a;
|
|
||||||
|
|
||||||
stasis_log_reordering_handle_t * rh
|
|
||||||
= stasis_log_reordering_handle_open(&XactionTable[ua->xid%MAX_TRANSACTIONS],
|
|
||||||
stasis_log_file,
|
|
||||||
(stasis_log_write_buffer_size * 0.25)/ua->divisor,
|
|
||||||
//512*1024/ua->divisor, // 0.5 mb in log tail at once
|
|
||||||
1000000/ua->divisor, // max num outstanding requests
|
|
||||||
(50 * 1024 * 1024)/ua->divisor // max backlog in bytes
|
|
||||||
);
|
|
||||||
for(int j = 0; j < a->rid_per_xact/ua->divisor; j++) {
|
|
||||||
int val = ua->i * (a->rid_per_xact/ua->divisor) + j;
|
|
||||||
TsetReorderable(ua->xid, rh, a->rids[(j*ua->divisor+ua->n)%a->num_rids], &val);
|
|
||||||
}
|
|
||||||
stasis_log_reordering_handle_close(rh);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* bg_worker(void * ap) {
|
|
||||||
bulk_worker_args * a = ap;
|
|
||||||
pthread_mutex_lock(&a->mut);
|
|
||||||
for(int i = 0; !a->done; i++) {
|
|
||||||
pthread_mutex_unlock(&a->mut);
|
|
||||||
|
|
||||||
struct timeval tv;
|
|
||||||
gettimeofday(&tv, 0);
|
|
||||||
long long start = tv.tv_usec + tv.tv_sec * 1000000;
|
|
||||||
|
|
||||||
int xid = Tbegin();
|
|
||||||
if(stasis_log_file->write_entry == my_write_entry) {
|
|
||||||
// based on tweaking; also, normal-net is ~ 100x slower than nromal
|
|
||||||
int num_worker = 100;
|
|
||||||
pthread_t workers[num_worker];
|
|
||||||
unit_of_work_arg args[num_worker];
|
|
||||||
for(int w = 0; w < num_worker; w++) {
|
|
||||||
args[i].xid = xid;
|
|
||||||
args[i].n = w;
|
|
||||||
args[i].i = i;
|
|
||||||
args[i].divisor = num_worker;
|
|
||||||
args[i].a = a;
|
|
||||||
pthread_create(&workers[w], 0, bg_unit_of_work, &(args[i]));
|
|
||||||
}
|
|
||||||
for(int w = 0; w < num_worker; w++) {
|
|
||||||
pthread_join(workers[w], 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
unit_of_work_arg unit_arg = {
|
|
||||||
xid,
|
|
||||||
0,
|
|
||||||
i,
|
|
||||||
1,
|
|
||||||
ap
|
|
||||||
};
|
|
||||||
bg_unit_of_work(&unit_arg);
|
|
||||||
}
|
|
||||||
Tcommit(xid);
|
|
||||||
|
|
||||||
gettimeofday(&tv, 0);
|
|
||||||
long long stop = tv.tv_usec + tv.tv_sec * 1000000;
|
|
||||||
printf("low(ms),%lld\n", stop-start);
|
|
||||||
fflush(stdout);
|
|
||||||
|
|
||||||
pthread_mutex_lock(&a->mut);
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&a->mut);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
char * mode = argv[1];
|
char * mode = argv[1];
|
||||||
|
@ -188,7 +17,7 @@ int main(int argc, char** argv) {
|
||||||
// XXX instead of overriding this, set tail of priority log to 80%
|
// XXX instead of overriding this, set tail of priority log to 80%
|
||||||
// stasis log buf or something...
|
// stasis log buf or something...
|
||||||
|
|
||||||
stasis_log_write_buffer_size = 50 * 1024 * 1024;
|
// stasis_log_write_buffer_size = 50 * 1024 * 1024;
|
||||||
|
|
||||||
printf("%s %s %s %s %lld\n", argv[0], argv[1], argv[2], argv[3],
|
printf("%s %s %s %s %lld\n", argv[0], argv[1], argv[2], argv[3],
|
||||||
stasis_log_write_buffer_size);
|
stasis_log_write_buffer_size);
|
||||||
|
@ -205,12 +34,10 @@ int main(int argc, char** argv) {
|
||||||
} else if (!strcmp(mode, "normal")) {
|
} else if (!strcmp(mode, "normal")) {
|
||||||
pthread_create(&worker, 0, normal_worker, &a);
|
pthread_create(&worker, 0, normal_worker, &a);
|
||||||
} else if (!strcmp(mode, "normal-net")) {
|
} else if (!strcmp(mode, "normal-net")) {
|
||||||
original_write_entry = stasis_log_file->write_entry;
|
emulate_remote_log();
|
||||||
stasis_log_file->write_entry = my_write_entry;
|
|
||||||
pthread_create(&worker, 0, normal_worker, &a);
|
pthread_create(&worker, 0, normal_worker, &a);
|
||||||
} else if (!strcmp(mode, "bg-net")) {
|
} else if (!strcmp(mode, "bg-net")) {
|
||||||
original_write_entry = stasis_log_file->write_entry;
|
emulate_remote_log();
|
||||||
stasis_log_file->write_entry = my_write_entry;
|
|
||||||
pthread_create(&worker, 0, bg_worker, &a);
|
pthread_create(&worker, 0, bg_worker, &a);
|
||||||
} else {
|
} else {
|
||||||
assert(!strcmp(mode, "bg"));
|
assert(!strcmp(mode, "bg"));
|
||||||
|
@ -218,7 +45,6 @@ int main(int argc, char** argv) {
|
||||||
}
|
}
|
||||||
|
|
||||||
sleep(10);
|
sleep(10);
|
||||||
// sleep 10 (reach steady state)
|
|
||||||
|
|
||||||
// run benchmark here
|
// run benchmark here
|
||||||
for(int i = 0; i < 60; i++) {
|
for(int i = 0; i < 60; i++) {
|
||||||
|
@ -237,7 +63,6 @@ int main(int argc, char** argv) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pthread_mutex_lock(&a.mut);
|
pthread_mutex_lock(&a.mut);
|
||||||
a.done = 1;
|
a.done = 1;
|
||||||
pthread_mutex_unlock(&a.mut);
|
pthread_mutex_unlock(&a.mut);
|
||||||
|
|
|
@ -1,86 +1,4 @@
|
||||||
#include <stasis/transactional.h>
|
#include "lsn_bench_common.h"
|
||||||
#include <string.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
void alloc_rids(long long num_rids, recordid ** slow, recordid ** fast) {
|
|
||||||
*slow = malloc(num_rids * sizeof(**slow));
|
|
||||||
*fast = malloc((num_rids / 10) * sizeof(**fast));
|
|
||||||
|
|
||||||
int xid = Tbegin();
|
|
||||||
|
|
||||||
byte * old = malloc(PAGE_SIZE);
|
|
||||||
byte * new = malloc(PAGE_SIZE);
|
|
||||||
|
|
||||||
for(long long i = 0; i < num_rids; ) {
|
|
||||||
pageid_t pid = TpageAlloc(xid);
|
|
||||||
Page * p = loadPage(xid, pid);
|
|
||||||
writelock(p->rwlatch,0);
|
|
||||||
memcpy(old, p->memAddr, PAGE_SIZE);
|
|
||||||
stasis_slotted_lsn_free_initialize_page(p);
|
|
||||||
while(i < num_rids &&
|
|
||||||
(
|
|
||||||
((*slow)[i] = stasis_record_alloc_begin(xid, p, sizeof(int))).size
|
|
||||||
== sizeof(int)
|
|
||||||
)
|
|
||||||
) {
|
|
||||||
stasis_record_alloc_done(xid, p, (*slow)[i]);
|
|
||||||
if(!(i%10)) {
|
|
||||||
(*fast)[i/10] = stasis_record_alloc_begin(xid, p, sizeof(int));
|
|
||||||
if((*fast)[i/10].size!=sizeof(int)) {
|
|
||||||
break; // don't increment i
|
|
||||||
}
|
|
||||||
stasis_record_alloc_done(xid, p, (*fast)[i/10]);
|
|
||||||
}
|
|
||||||
assert((*slow)[i].size != -1);
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
memcpy(new, p->memAddr, PAGE_SIZE);
|
|
||||||
memcpy(p->memAddr, old, PAGE_SIZE);
|
|
||||||
unlock(p->rwlatch);
|
|
||||||
releasePage(p);
|
|
||||||
TpageSet(xid, pid, new);
|
|
||||||
}
|
|
||||||
free(old);
|
|
||||||
free(new);
|
|
||||||
|
|
||||||
Tcommit(xid);
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
pageid_t pid;
|
|
||||||
pageoff_t off;
|
|
||||||
pageoff_t len;
|
|
||||||
int val;
|
|
||||||
} cached_addr;
|
|
||||||
|
|
||||||
void build_cache(recordid * rids, cached_addr** cache, long long count) {
|
|
||||||
*cache = malloc (sizeof(**cache) * count);
|
|
||||||
lsn_t log_trunc = stasis_log_file->truncation_point(stasis_log_file);
|
|
||||||
for(long long i = 0; i < count; i++) {
|
|
||||||
(*cache)[i].pid = rids[i].page;
|
|
||||||
|
|
||||||
Page * p = loadPage(-1, (*cache)[i].pid);
|
|
||||||
readlock(p->rwlatch,0);
|
|
||||||
byte * b = stasis_record_write_begin(-1, p, rids[i]);
|
|
||||||
(*cache)[i].off = b - p->memAddr;
|
|
||||||
stasis_record_write_done(-1, p, rids[i], b);
|
|
||||||
stasis_page_lsn_write(-1, p, log_trunc);
|
|
||||||
(*cache)[i].len = stasis_record_type_to_size(rids[i].size);
|
|
||||||
(*cache)[i].val = 0;
|
|
||||||
unlock(p->rwlatch);
|
|
||||||
// releasePage(p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
int net_latency = 2;
|
|
||||||
static byte * (*origWrite)(int xid, Page *p, recordid rid);
|
|
||||||
static byte * slowWrite(int xid, Page *p, recordid rid) {
|
|
||||||
usleep(net_latency * 1000);
|
|
||||||
return origWrite(xid,p,rid);
|
|
||||||
}
|
|
||||||
static const byte * (*origRead)(int xid, Page *p, recordid rid);
|
|
||||||
static const byte * slowRead(int xid, Page *p, recordid rid) {
|
|
||||||
usleep(net_latency * 1000);
|
|
||||||
return origRead(xid,p,rid);
|
|
||||||
}
|
|
||||||
|
|
||||||
int main (int argc, char ** argv) {
|
int main (int argc, char ** argv) {
|
||||||
unlink("storefile.txt");
|
unlink("storefile.txt");
|
||||||
|
@ -97,22 +15,15 @@ int main (int argc, char ** argv) {
|
||||||
Tinit();
|
Tinit();
|
||||||
|
|
||||||
alloc_rids(num_rids,&rids,&fast);
|
alloc_rids(num_rids,&rids,&fast);
|
||||||
|
|
||||||
|
if(!(strcmp(mode, "normal-net")&&strcmp(mode,"writeback-net"))) {
|
||||||
|
emulate_remote_pages();
|
||||||
|
}
|
||||||
|
|
||||||
if(writeback) {
|
if(writeback) {
|
||||||
build_cache(rids,&cache,num_rids);
|
build_cache(rids,&cache,num_rids);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!(strcmp(mode, "normal-net")&&strcmp(mode,"writeback-net"))) {
|
|
||||||
origWrite = stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordWrite;
|
|
||||||
origRead = stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordRead;
|
|
||||||
|
|
||||||
// xxx a bit of cheating; don't pay latency for lsn write
|
|
||||||
// (could amortize w/ recordWrite)
|
|
||||||
|
|
||||||
stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordWrite = slowWrite;
|
|
||||||
stasis_page_impl_get(SLOTTED_LSN_FREE_PAGE)->recordRead = slowRead;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
lsn_t last_lsn = 0;
|
lsn_t last_lsn = 0;
|
||||||
for(long long i = 0; i < num_xacts; i++) {
|
for(long long i = 0; i < num_xacts; i++) {
|
||||||
int xid = Tbegin();
|
int xid = Tbegin();
|
||||||
|
|
|
@ -77,14 +77,12 @@ stasis_log_t* stasis_log_file = 0;
|
||||||
|
|
||||||
static int pendingCommits;
|
static int pendingCommits;
|
||||||
|
|
||||||
TransactionLog LogTransBegin(stasis_log_t* log, int xid) {
|
void LogTransBegin(stasis_log_t* log, int xid, TransactionLog* tl) {
|
||||||
TransactionLog tl;
|
tl->xid = xid;
|
||||||
tl.xid = xid;
|
|
||||||
|
|
||||||
DEBUG("Log Begin %d\n", xid);
|
DEBUG("Log Begin %d\n", xid);
|
||||||
tl.prevLSN = -1;
|
tl->prevLSN = -1;
|
||||||
tl.recLSN = -1;
|
tl->recLSN = -1;
|
||||||
return tl;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static lsn_t LogTransCommon(stasis_log_t* log, TransactionLog * l, int type) {
|
static lsn_t LogTransCommon(stasis_log_t* log, TransactionLog * l, int type) {
|
||||||
|
@ -93,8 +91,11 @@ static lsn_t LogTransCommon(stasis_log_t* log, TransactionLog * l, int type) {
|
||||||
|
|
||||||
log->write_entry(log, e);
|
log->write_entry(log, e);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&l->mut);
|
||||||
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
|
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
|
||||||
l->prevLSN = e->LSN;
|
l->prevLSN = e->LSN;
|
||||||
|
pthread_mutex_unlock(&l->mut);
|
||||||
|
|
||||||
DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid,
|
DEBUG("Log Common %d, LSN: %ld type: %ld (prevLSN %ld)\n", e->xid,
|
||||||
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN);
|
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN);
|
||||||
|
|
||||||
|
@ -113,8 +114,10 @@ static lsn_t LogTransCommonPrepare(stasis_log_t* log, TransactionLog * l) {
|
||||||
e->xid, e->prevLSN, l->recLSN, getPrepareRecLSN(e));
|
e->xid, e->prevLSN, l->recLSN, getPrepareRecLSN(e));
|
||||||
log->write_entry(log, e);
|
log->write_entry(log, e);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&l->mut);
|
||||||
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
|
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
|
||||||
l->prevLSN = e->LSN;
|
l->prevLSN = e->LSN;
|
||||||
|
pthread_mutex_unlock(&l->mut);
|
||||||
DEBUG("Log Common prepare XXX %d, LSN: %ld type: %ld (prevLSN %ld)\n",
|
DEBUG("Log Common prepare XXX %d, LSN: %ld type: %ld (prevLSN %ld)\n",
|
||||||
e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN);
|
e->xid, (long int)e->LSN, (long int)e->type, (long int)e->prevLSN);
|
||||||
|
|
||||||
|
@ -137,9 +140,10 @@ LogEntry * LogUpdate(stasis_log_t* log, TransactionLog * l,
|
||||||
log->write_entry(log, e);
|
log->write_entry(log, e);
|
||||||
DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid,
|
DEBUG("Log Update %d, LSN: %ld type: %ld (prevLSN %ld) (arg_size %ld)\n", e->xid,
|
||||||
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) arg_size);
|
(long int)e->LSN, (long int)e->type, (long int)e->prevLSN, (long int) arg_size);
|
||||||
|
pthread_mutex_lock(&l->mut);
|
||||||
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
|
if(l->prevLSN == -1) { l->recLSN = e->LSN; }
|
||||||
l->prevLSN = e->LSN;
|
l->prevLSN = e->LSN;
|
||||||
|
pthread_mutex_unlock(&l->mut);
|
||||||
return e;
|
return e;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
#include <stasis/transactional.h>
|
#include <stasis/transactional.h>
|
||||||
#include <stasis/logger/reorderingHandle.h>
|
#include <stasis/logger/reorderingHandle.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
|
long stasis_log_reordering_usleep_after_flush = 0;
|
||||||
|
|
||||||
static void* stasis_log_reordering_handle_worker(void * a) {
|
static void* stasis_log_reordering_handle_worker(void * a) {
|
||||||
stasis_log_reordering_handle_t * h = (typeof(h))a;
|
stasis_log_reordering_handle_t * h = (typeof(h))a;
|
||||||
pthread_mutex_lock(&h->mut);
|
pthread_mutex_lock(&h->mut);
|
||||||
|
@ -17,10 +20,6 @@ static void* stasis_log_reordering_handle_worker(void * a) {
|
||||||
assert(e->xid != INVALID_XID);
|
assert(e->xid != INVALID_XID);
|
||||||
chunk_len += sizeofLogEntry(e);
|
chunk_len += sizeofLogEntry(e);
|
||||||
|
|
||||||
h->cur_len--;
|
|
||||||
h->phys_size -= sizeofLogEntry(e);
|
|
||||||
h->cur_off = (h->cur_off+1)%h->max_len;
|
|
||||||
|
|
||||||
if(h->queue[h->cur_off].p) {
|
if(h->queue[h->cur_off].p) {
|
||||||
Page * p = h->queue[h->cur_off].p;
|
Page * p = h->queue[h->cur_off].p;
|
||||||
writelock(p->rwlatch,0);
|
writelock(p->rwlatch,0);
|
||||||
|
@ -30,11 +29,19 @@ static void* stasis_log_reordering_handle_worker(void * a) {
|
||||||
} /*
|
} /*
|
||||||
else it's the caller's problem; flush(), and checking the
|
else it's the caller's problem; flush(), and checking the
|
||||||
xaction table for prevLSN is their friend. */
|
xaction table for prevLSN is their friend. */
|
||||||
|
|
||||||
|
h->cur_len--;
|
||||||
|
h->phys_size -= sizeofLogEntry(e);
|
||||||
|
h->cur_off = (h->cur_off+1)%h->max_len;
|
||||||
|
|
||||||
}
|
}
|
||||||
if(chunk_len > 0) {
|
if(chunk_len > 0) {
|
||||||
lsn_t to_force = h->l->prevLSN;
|
lsn_t to_force = h->l->prevLSN;
|
||||||
pthread_mutex_unlock(&h->mut);
|
pthread_mutex_unlock(&h->mut);
|
||||||
LogForce(h->log, to_force, LOG_FORCE_COMMIT);
|
LogForce(h->log, to_force, LOG_FORCE_COMMIT);
|
||||||
|
if(stasis_log_reordering_usleep_after_flush) {
|
||||||
|
usleep(stasis_log_reordering_usleep_after_flush);
|
||||||
|
}
|
||||||
pthread_mutex_lock(&h->mut);
|
pthread_mutex_lock(&h->mut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,13 +76,14 @@ int TsetReorderableWriteBack(int xid, stasis_log_reordering_handle_t * h,
|
||||||
byte * b = (byte*)&a[2];
|
byte * b = (byte*)&a[2];
|
||||||
memcpy(b,dat,len);
|
memcpy(b,dat,len);
|
||||||
memcpy(b+len,dat,len);
|
memcpy(b+len,dat,len);
|
||||||
|
lsn_t ret = 0;
|
||||||
if(!h) {
|
if(!h) {
|
||||||
TwritebackUpdate(xid,page,buf,sz,OPERATION_SET_LSN_FREE);
|
ret = TwritebackUpdate(xid,page,buf,sz,OPERATION_SET_LSN_FREE);
|
||||||
} else {
|
} else {
|
||||||
TreorderableWritebackUpdate(xid,h,page,buf,sz,OPERATION_SET_LSN_FREE);
|
TreorderableWritebackUpdate(xid,h,page,buf,sz,OPERATION_SET_LSN_FREE);
|
||||||
}
|
}
|
||||||
free(buf);
|
free(buf);
|
||||||
return 0;
|
return ret;
|
||||||
}
|
}
|
||||||
int TsetWriteBack(int xid, pageid_t page, pageoff_t off, pageoff_t len, const void * dat, const void * olddat) {
|
int TsetWriteBack(int xid, pageid_t page, pageoff_t off, pageoff_t len, const void * dat, const void * olddat) {
|
||||||
return TsetReorderableWriteBack(xid,0,page,off,len,dat,olddat);
|
return TsetReorderableWriteBack(xid,0,page,off,len,dat,olddat);
|
||||||
|
|
|
@ -52,6 +52,9 @@ pthread_mutex_t transactional_2_mutex;
|
||||||
|
|
||||||
void setupOperationsTable() {
|
void setupOperationsTable() {
|
||||||
memset(XactionTable, INVALID_XTABLE_XID, sizeof(TransactionLog)*MAX_TRANSACTIONS);
|
memset(XactionTable, INVALID_XTABLE_XID, sizeof(TransactionLog)*MAX_TRANSACTIONS);
|
||||||
|
for(int i = 0; i < MAX_TRANSACTIONS; i++) {
|
||||||
|
pthread_mutex_init(&XactionTable[i].mut,0);
|
||||||
|
}
|
||||||
// @todo clean out unused constants...
|
// @todo clean out unused constants...
|
||||||
operationsTable[OPERATION_SET] = getSet();
|
operationsTable[OPERATION_SET] = getSet();
|
||||||
operationsTable[OPERATION_SET_INVERSE] = getSetInverse();
|
operationsTable[OPERATION_SET_INVERSE] = getSetInverse();
|
||||||
|
@ -282,7 +285,7 @@ int Tbegin() {
|
||||||
|
|
||||||
pthread_mutex_unlock(&transactional_2_mutex);
|
pthread_mutex_unlock(&transactional_2_mutex);
|
||||||
|
|
||||||
XactionTable[index] = LogTransBegin(stasis_log_file, xidCount_tmp);
|
LogTransBegin(stasis_log_file, xidCount_tmp, &XactionTable[index]);
|
||||||
|
|
||||||
if(globalLockManager.begin) { globalLockManager.begin(XactionTable[index].xid); }
|
if(globalLockManager.begin) { globalLockManager.begin(XactionTable[index].xid); }
|
||||||
|
|
||||||
|
@ -317,6 +320,7 @@ void TreorderableUpdate(int xid, void * hp, pageid_t page,
|
||||||
stasis_log_reordering_handle_t * h = (typeof(h))hp;
|
stasis_log_reordering_handle_t * h = (typeof(h))hp;
|
||||||
assert(xid >= 0 && XactionTable[xid % MAX_TRANSACTIONS].xid == xid);
|
assert(xid >= 0 && XactionTable[xid % MAX_TRANSACTIONS].xid == xid);
|
||||||
Page * p = loadPage(xid, page);
|
Page * p = loadPage(xid, page);
|
||||||
|
assert(p);
|
||||||
try {
|
try {
|
||||||
if(globalLockManager.writeLockPage) {
|
if(globalLockManager.writeLockPage) {
|
||||||
globalLockManager.writeLockPage(xid, p->id);
|
globalLockManager.writeLockPage(xid, p->id);
|
||||||
|
@ -363,7 +367,7 @@ void TreorderableWritebackUpdate(int xid, void* hp,
|
||||||
pthread_mutex_lock(&h->mut);
|
pthread_mutex_lock(&h->mut);
|
||||||
LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, dat, datlen);
|
LogEntry * e = allocUpdateLogEntry(-1, xid, op, page, dat, datlen);
|
||||||
stasis_log_reordering_handle_append(h, 0, op, dat, datlen, sizeofLogEntry(e));
|
stasis_log_reordering_handle_append(h, 0, op, dat, datlen, sizeofLogEntry(e));
|
||||||
|
pthread_mutex_unlock(&h->mut);
|
||||||
}
|
}
|
||||||
compensated_function void TupdateStr(int xid, pageid_t page,
|
compensated_function void TupdateStr(int xid, pageid_t page,
|
||||||
const char *dat, size_t datlen, int op) {
|
const char *dat, size_t datlen, int op) {
|
||||||
|
|
|
@ -65,6 +65,7 @@ typedef struct TransactionLog {
|
||||||
int xid;
|
int xid;
|
||||||
lsn_t prevLSN;
|
lsn_t prevLSN;
|
||||||
lsn_t recLSN;
|
lsn_t recLSN;
|
||||||
|
pthread_mutex_t mut;
|
||||||
} TransactionLog;
|
} TransactionLog;
|
||||||
|
|
||||||
typedef struct stasis_log_t stasis_log_t;
|
typedef struct stasis_log_t stasis_log_t;
|
||||||
|
@ -202,7 +203,7 @@ void LogForce(stasis_log_t* log, lsn_t lsn, stasis_log_force_mode_t mode);
|
||||||
Inform the logging layer that a new transaction has begun, and
|
Inform the logging layer that a new transaction has begun, and
|
||||||
obtain a handle.
|
obtain a handle.
|
||||||
*/
|
*/
|
||||||
TransactionLog LogTransBegin(stasis_log_t* log, int xid);
|
void LogTransBegin(stasis_log_t* log, int xid, TransactionLog* l);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
Write a transaction PREPARE to the log tail. Blocks until the
|
Write a transaction PREPARE to the log tail. Blocks until the
|
||||||
|
|
|
@ -3,6 +3,8 @@
|
||||||
#include <stasis/common.h>
|
#include <stasis/common.h>
|
||||||
#include <stasis/logger/logger2.h>
|
#include <stasis/logger/logger2.h>
|
||||||
|
|
||||||
|
extern long stasis_log_reordering_usleep_after_flush;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
Page * p;
|
Page * p;
|
||||||
unsigned int op;
|
unsigned int op;
|
||||||
|
|
|
@ -8,4 +8,7 @@ int TsetReorderable(int xid, stasis_log_reordering_handle_t * h,
|
||||||
recordid rid, const void *dat);
|
recordid rid, const void *dat);
|
||||||
int TsetWriteBack(int xid, pageid_t page, pageoff_t off, pageoff_t len,
|
int TsetWriteBack(int xid, pageid_t page, pageoff_t off, pageoff_t len,
|
||||||
const void * dat, const void * olddat);
|
const void * dat, const void * olddat);
|
||||||
|
int TsetReorderableWriteBack(int xid, stasis_log_reordering_handle_t * h,
|
||||||
|
pageid_t page, pageoff_t off, pageoff_t len,
|
||||||
|
const void * dat, const void * olddat);
|
||||||
#endif //__LSN_FREE_SET_H
|
#endif //__LSN_FREE_SET_H
|
||||||
|
|
|
@ -387,9 +387,9 @@ START_TEST(loggerCheckThreaded) {
|
||||||
Tdeinit();
|
Tdeinit();
|
||||||
|
|
||||||
} END_TEST
|
} END_TEST
|
||||||
|
void setupOperationsTable();
|
||||||
void reopenLogWorkload(int truncating) {
|
void reopenLogWorkload(int truncating) {
|
||||||
|
setupOperationsTable();
|
||||||
stasis_truncation_automatic = 0;
|
stasis_truncation_automatic = 0;
|
||||||
|
|
||||||
const int ENTRY_COUNT = 1000;
|
const int ENTRY_COUNT = 1000;
|
||||||
|
@ -408,7 +408,9 @@ void reopenLogWorkload(int truncating) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int xid = 1;
|
int xid = 1;
|
||||||
TransactionLog l = LogTransBegin(stasis_log_file, xid);
|
TransactionLog l;
|
||||||
|
pthread_mutex_init(&l.mut,0);
|
||||||
|
LogTransBegin(stasis_log_file, xid, &l);
|
||||||
lsn_t startLSN = 0;
|
lsn_t startLSN = 0;
|
||||||
|
|
||||||
LogEntry * entries[ENTRY_COUNT];
|
LogEntry * entries[ENTRY_COUNT];
|
||||||
|
|
Loading…
Reference in a new issue