O_DIRECT (sort of...)

This commit is contained in:
Sears Russell 2004-07-27 21:30:54 +00:00
parent 54ba9b0347
commit 78eb2cbf6a
14 changed files with 250 additions and 181 deletions

View file

@ -105,13 +105,12 @@ typedef struct Page_s Page;
/**
* @param pageid ID of the page you want to load
* @return fully formed Page type
* @return page with -1 ID if page not found
*/
Page * loadPage(int pageid);
/**
loadPage aquires a lock when it is called, effectively pinning it
in memory. realeasePage releases this lock.
in memory. releasePage releases this lock.
*/
void releasePage(Page * p);
@ -123,28 +122,6 @@ void releasePage(Page * p);
*/
int bufInit();
/**
* allocate a record. This must be done in two phases. The first
* phase reserves a slot, and produces a log entry. The second phase
* sets up the slot according to the contents of the log entry.
*
* Ralloc implements the first phase.
*
* @param xid The active transaction.
* @param size The size of the new record
* @return allocated record
*
* @see slotRalloc the implementation of the second phase.
*/
recordid ralloc(int xid, long size);
/**
* allocate a record at a given slot. (Useful for recovery.)
*
* @see ralloc
*/
void slotRalloc(Page * page, lsn_t lsn, recordid rid);
/**
* @param pageid ID of page you want to read

View file

@ -99,8 +99,8 @@ extern int errno;
#define lsn_t long
/*#define DEBUGGING
#define PROFILE_LATCHES */
/*#define DEBUGGING */
/*#define PROFILE_LATCHES */
#ifdef DEBUGGING
/** @todo Files that use DEBUG have to pull in stdio.h, which is a pain! */

View file

@ -77,12 +77,12 @@ terms specified in this license.
/*#define MAX_BUFFER_SIZE 100003 */
/*#define MAX_BUFFER_SIZE 10007*/
#define MAX_BUFFER_SIZE 71
/* 71 */
/*#define MAX_BUFFER_SIZE 5003*/
/*#define MAX_BUFFER_SIZE 71 */
#define MAX_BUFFER_SIZE 7
#define BUFFER_ASOOCIATIVE 2
#define MAX_TRANSACTIONS 32
#define MAX_TRANSACTIONS 1000
/** Operation types */

View file

@ -3,6 +3,6 @@
lib_LIBRARIES=liblladd.a
#liblladd_a_LIBADD=logger/liblogger.a operations/liboperations.a
# removed: recovery.c transactional.c logger.c logger/logparser.c logger/logstreamer.c
liblladd_a_SOURCES=common.c stats.c bufferManager.c linkedlist.c operations.c pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c operations/decrement.c operations/increment.c operations/prepare.c operations/set.c operations/alloc.c #operations/lladdhash.c
liblladd_a_SOURCES=common.c stats.c io.c bufferManager.c linkedlist.c operations.c pageFile.c pageCache.c page.c blobManager.c recovery2.c transactional2.c logger/logEntry.c logger/logWriter.c logger/logHandle.c logger/logger2.c operations/decrement.c operations/increment.c operations/prepare.c operations/set.c operations/alloc.c #operations/lladdhash.c
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -4,15 +4,16 @@
#include <assert.h>
#include <lladd/transactional.h>
#include "pageFile.h"
#include <lladd/bufferManager.h>
#include <lladd/constants.h>
#include "latches.h"
#include "blobManager.h"
#include "io.h"
#include <pbl/pbl.h>
#include "page.h"
#include <stdio.h>
@ -296,21 +297,17 @@ void readBlob(int xid, Page * p, recordid rid, void * buf) {
funlockfile(fd);
}
/** @todo dirtyBlobs should contain the highest LSN that wrote to the
current version of the dirty blob, and the lsn field should be
checked to be sure that it increases monotonically. */
void writeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf) {
/**
Examines the blob in question, marks it dirty, and returns the
appropriate file descriptor.
*/
static FILE * getDirtyFD(int xid, Page * p, lsn_t lsn, recordid rid) {
lsn_t * dirty = tripleHashLookup(xid, rid);
FILE * fd;
blob_record_t rec;
/* First, determine if the blob is dirty. */
lsn_t * dirty = tripleHashLookup(xid, rid);
blob_record_t rec;
long offset;
FILE * fd;
int readcount;
DEBUG("Writing blob (size %ld)\n", rid.size);
/* Tread() raw record */
readRawRecord(xid, p, rid, &rec, sizeof(blob_record_t));
@ -321,9 +318,6 @@ void writeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf) {
assert(lsn > *dirty);
*dirty = lsn; /* Updates value in triple hash (works because of pointer aliasing.) */
DEBUG("Blob already dirty.\n");
} else {
DEBUG("Marking blob dirty.\n");
tripleHashInsert(xid, rid, lsn);
@ -336,19 +330,52 @@ void writeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf) {
fd = rec.fd ? blobf1 : blobf0; /* rec's fd is up-to-date, so use it directly */
return fd;
}
void setRangeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf, long offset, long length) {
FILE * fd;
int readcount;
blob_record_t rec;
fd = getDirtyFD(xid, p, lsn, rid);
readRawRecord(xid, p, rid, &rec, sizeof(blob_record_t));
flockfile(fd);
offset = myFseek(fd, rec.offset + offset, SEEK_SET);
assert(offset == rec.offset);
readcount = fwrite(buf, length, 1, fd);
assert(1 == readcount);
funlockfile(fd);
}
/** @todo dirtyBlobs should contain the highest LSN that wrote to the
current version of the dirty blob, and the lsn field should be
checked to be sure that it increases monotonically. */
void writeBlob(int xid, Page * p, lsn_t lsn, recordid rid, const void * buf) {
long offset;
FILE * fd;
int readcount;
blob_record_t rec;
DEBUG("Writing blob (size %ld)\n", rid.size);
fd = getDirtyFD(xid, p, lsn, rid);
readRawRecord(xid, p, rid, &rec, sizeof(blob_record_t));
DEBUG("Writing at offset = %d, size = %ld\n", rec.offset, rec.size);
flockfile(fd);
offset = myFseek(fd, rec.offset, SEEK_SET);
assert(offset == rec.offset);
readcount = fwrite(buf, rec.size, 1, fd);
funlockfile(fd);
assert(1 == readcount);
/* No need to update the raw blob record. */
funlockfile(fd);
/* No need to update the raw blob record. */
}
/** @todo check to see if commitBlobs actually needs to flush blob
files when it's called (are there any dirty blobs associated with

View file

@ -59,23 +59,11 @@ terms specified in this license.
#include "pageFile.h"
#include <pbl/pbl.h>
/**
Invariant: This lock should be held while updating lastFreepage, or
while performing any operation that may decrease the amount of
freespace in the page that lastFreepage refers to.
Since pageCompact and pageDeRalloc may only increase this value,
they do not need to hold this lock. Since bufferManager is the
only place where pageRalloc is called, pageRalloc does not obtain
this lock.
*/
static pblHashTable_t *activePages; /* page lookup */
static pthread_mutex_t loadPagePtr_mutex;
static pthread_mutex_t lastFreepage_mutex;
static unsigned int lastFreepage = 0;
static Page * dummy_page;
int bufInit() {
@ -84,11 +72,9 @@ int bufInit() {
openPageFile();
pthread_mutex_init(&lastFreepage_mutex , NULL);
pthread_mutex_init(&loadPagePtr_mutex, NULL);
activePages = pblHtCreate();
lastFreepage = 0;
activePages = pblHtCreate();
dummy_page = pageAlloc(-1);
pageRealloc(dummy_page, -1);
@ -152,36 +138,6 @@ void releasePage (Page * p) {
unlock(p->loadlatch);
}
Page * lastRallocPage = 0;
/** @todo ralloc ignores it's xid parameter; change the interface? */
recordid ralloc(int xid, long size) {
recordid ret;
Page * p;
/* DEBUG("Rallocing record of size %ld\n", (long int)size); */
assert(size < BLOB_THRESHOLD_SIZE || size == BLOB_SLOT);
pthread_mutex_lock(&lastFreepage_mutex);
while(freespace(p = loadPage(lastFreepage)) < size ) {
releasePage(p);
lastFreepage++;
}
ret = pageRalloc(p, size);
releasePage(p);
pthread_mutex_unlock(&lastFreepage_mutex);
/* DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size); */
return ret;
}
void writeRecord(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
/* Page *p; */

View file

@ -45,8 +45,8 @@ terms specified in this license.
#include <lladd/transactional.h>
#include "logWriter.h"
#include "logHandle.h"
#include "../pageFile.h"
#include "latches.h"
#include "io.h"
#include <assert.h>
#include <stdio.h>

View file

@ -121,6 +121,22 @@ static int getSlotLength(byte *memAddr, int slot) ;
static void setSlotOffset(byte *memAddr, int slot, int offset) ;
static void setSlotLength(byte *memAddr, int slot, int length) ;
/**
Invariant: This lock should be held while updating lastFreepage, or
while performing any operation that may decrease the amount of
freespace in the page that lastFreepage refers to.
Since pageCompact and pageDeRalloc may only increase this value,
they do not need to hold this lock. Since bufferManager is the
only place where pageRalloc is called, pageRalloc does not obtain
this lock.
*/
static pthread_mutex_t lastFreepage_mutex;
static unsigned int lastFreepage = 0;
/** @todo replace static ints in page.c with #defines. */
/* ------ */
@ -412,6 +428,10 @@ void pageInit() {
pool[i].memAddr = malloc(PAGE_SIZE);
}
pthread_mutex_init(&lastFreepage_mutex , NULL);
lastFreepage = 0;
}
void pageDeInit() {
@ -528,6 +548,36 @@ int freespace(Page * page) {
return ret;
}
/** @todo ralloc ignores it's xid parameter; change the interface? */
recordid ralloc(int xid, long size) {
recordid ret;
Page * p;
/* DEBUG("Rallocing record of size %ld\n", (long int)size); */
assert(size < BLOB_THRESHOLD_SIZE || size == BLOB_SLOT);
pthread_mutex_lock(&lastFreepage_mutex);
while(freespace(p = loadPage(lastFreepage)) < size ) {
releasePage(p);
lastFreepage++;
}
ret = pageRalloc(p, size);
releasePage(p);
pthread_mutex_unlock(&lastFreepage_mutex);
/* DEBUG("alloced rid = {%d, %d, %ld}\n", ret.page, ret.slot, ret.size); */
return ret;
}
recordid pageRalloc(Page * page, int size) {
int freeSpace;
int numSlots;

View file

@ -215,6 +215,23 @@ lsn_t pageReadLSN(const Page * page);
*/
int freespace(Page * page);
/**
* allocate a record. This must be done in two phases. The first
* phase reserves a slot, and produces a log entry. The second phase
* sets up the slot according to the contents of the log entry.
*
* Ralloc implements the first phase.
*
* @param xid The active transaction.
* @param size The size of the new record
* @return allocated record
*
* @see slotRalloc the implementation of the second phase.
*/
recordid ralloc(int xid, long size);
/**
* assumes that the page is already loaded in memory. It takes as
* parameters a Page and the size in bytes of the new record. pageRalloc()

View file

@ -12,12 +12,25 @@
#include <assert.h>
#include "logger/logWriter.h"
static FILE * stable = NULL;
#include <sys/types.h>
#include <sys/stat.h>
#define __USE_GNU /* For O_DIRECT.. */
#include <fcntl.h>
#include <unistd.h>
static int stable = -1;
/** Defined in bufferManager.c */
extern pthread_mutex_t add_pending_mutex;
static pthread_mutex_t stable_mutex;
static long myLseek(int f, long offset, int whence);
static long myLseekNoLock(int f, long offset, int whence);
void pageRead(Page *ret) {
long fileSize;
/* long fileSize; */
long pageoffset;
long offset;
@ -25,10 +38,9 @@ void pageRead(Page *ret) {
/** @todo pageRead() is using fseek to calculate the file size on each read, which is inefficient. */
pageoffset = ret->id * PAGE_SIZE;
flockfile(stable);
fileSize = myFseekNoLock(stable, 0, SEEK_END);
/* flockfile(stable); */
pthread_mutex_lock(&stable_mutex);
/* fileSize = myLseekNoLock(stable, 0, SEEK_END); */
/* DEBUG("Reading page %d\n", ret->id); */
@ -41,25 +53,40 @@ void pageRead(Page *ret) {
}
assert(ret->memAddr); */
if ((ret->id)*PAGE_SIZE >= fileSize) {
myFseekNoLock(stable, (1+ ret->id) * PAGE_SIZE -1, SEEK_SET);
/** @todo was manual extension of the storefile really necessary? */
/* if ((ret->id)*PAGE_SIZE >= fileSize) {
myLseekNoLock(stable, (ret->id - 1) * PAGE_SIZE -1, SEEK_SET);
if(1 != fwrite("", 1, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof extending storefile!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error extending storefile! %d", ferror(stable)); fflush(NULL); abort(); }
}
}*/
}
offset = myFseekNoLock(stable, pageoffset, SEEK_SET);
offset = myLseekNoLock(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset);
if(1 != fread(ret->memAddr, PAGE_SIZE, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof reading!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error reading stream! %d", ferror(stable)); fflush(NULL); abort(); }
int read_size;
read_size = read(stable, ret->memAddr, PAGE_SIZE);
if(read_size != PAGE_SIZE) {
if (!read_size) {
long fileSize = myLseekNoLock(stable, 0, SEEK_END);
offset = myLseekNoLock(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset);
if(fileSize <= pageoffset) {
memset(ret->memAddr, 0, PAGE_SIZE);
write(stable, ret->memAddr, PAGE_SIZE);
}
funlockfile(stable);
} else if(read_size == -1) {
perror("pageFile.c couldn't read");
fflush(NULL);
assert(0);
} else {
printf("pageFile.c readfile: read_size = %d, errno = %d\n", read_size, errno);
abort();
}
}
pthread_mutex_unlock(&stable_mutex);
}
@ -75,64 +102,91 @@ void pageWrite(Page * ret) {
syncLog();
}
flockfile(stable);
offset = myFseekNoLock(stable, pageoffset, SEEK_SET);
pthread_mutex_lock(&stable_mutex);
offset = myLseekNoLock(stable, pageoffset, SEEK_SET);
assert(offset == pageoffset);
assert(ret->memAddr);
/* DEBUG("Writing page %d\n", ret->id); */
if(1 != fwrite(ret->memAddr, PAGE_SIZE, 1, stable)) {
if(feof(stable)) { printf("Unexpected eof writing!\n"); fflush(NULL); abort(); }
if(ferror(stable)) { printf("Error writing stream! %d", ferror(stable)); fflush(NULL); abort(); }
int write_ret = write(stable, ret->memAddr, PAGE_SIZE);
if(-1 == write_ret) {
perror("pageFile.c couldn't write");
fflush(NULL);
abort();
} else if(0 == write_ret) {
/* now what? */
printf("write_ret is zero\n");
fflush(NULL);
abort();
} else if(write_ret != PAGE_SIZE){
printf("write_ret is %d\n", write_ret);
fflush(NULL);
abort();
}
funlockfile(stable);
pthread_mutex_unlock(&stable_mutex);
}
void openPageFile() {
DEBUG("Opening storefile.\n");
if( ! (stable = fopen(STORE_FILE, "r+"))) { /* file may not exist */
/* if( ! (stable = fopen(STORE_FILE, "r+"))) { / * file may not exist * /
byte* zero = calloc(1, PAGE_SIZE);
if(!(stable = fopen(STORE_FILE, "w+"))) { perror("Couldn't open or create store file"); abort(); }
/ * Write out one page worth of zeros to get started. * /
if(1 != fwrite(zero, PAGE_SIZE, 1, stable)) { assert (0); }
/ * if(1 != fwrite(zero, PAGE_SIZE, 1, stable)) { assert (0); } * /
free(zero);
}
DEBUG("storefile opened.\n");
*/
stable = open (STORE_FILE, O_CREAT | O_RDWR | O_DIRECT, S_IRWXU | S_IRWXG | S_IRWXO);
if(stable == -1) {
perror("couldn't open storefile");
fflush(NULL);
abort();
}
pthread_mutex_init(&stable_mutex, NULL);
}
void closePageFile() {
int ret = fclose(stable);
assert(!ret);
stable = NULL;
int ret = close(stable);
if(-1 == ret) {
perror("Couldn't close storefile.");
fflush(NULL);
abort();
}
stable = -1;
}
long myFseek(FILE * f, long offset, int whence) {
long myLseek(int f, long offset, int whence) {
long ret;
flockfile(f);
ret = myFseekNoLock(f, offset, whence);
funlockfile(f);
pthread_mutex_lock(&stable_mutex);
ret = myLseekNoLock(f, offset, whence);
pthread_mutex_unlock(&stable_mutex);
return ret;
}
long myFseekNoLock(FILE * f, long offset, int whence) {
long ret;
if(0 != fseek(f, offset, whence)) { perror ("fseek"); fflush(NULL); abort(); }
if(-1 == (ret = ftell(f))) { perror("ftell"); fflush(NULL); abort(); }
long myLseekNoLock(int f, long offset, int whence) {
assert(! ( offset % 4096 ));
long ret = lseek(f, offset, whence);
if(ret == -1) {
perror("Couldn't seek.");
fflush(NULL);
abort();
}
return ret;
}
void myFwrite(const void * dat, long size, FILE * f) {
/*void myFwrite(const void * dat, long size, FILE * f) {
int nmemb = fwrite(dat, size, 1, f);
/ * test * /
if(nmemb != 1) {
@ -141,10 +195,10 @@ void myFwrite(const void * dat, long size, FILE * f) {
/ * return FILE_WRITE_OPEN_ERROR; * /
}
}
}*/
long pageCount() {
long fileSize = myFseek(stable, 0, SEEK_END);
long fileSize = myLseek(stable, 0, SEEK_END);
assert(! (fileSize % PAGE_SIZE));
return fileSize / PAGE_SIZE;

View file

@ -2,7 +2,7 @@
#ifndef __PAGE_FILE_H
#define __PAGE_FILE_H
#include "page.h"
#include <stdio.h>
/**
* Write page to disk, including correct LSN. Doing so may require a
* call to logSync(). There is not much that can be done to avoid
@ -46,10 +46,6 @@ long pageCount();
void openPageFile();
void closePageFile();
long myFseek(FILE * f, long offset, int whence);
long myFseekNoLock(FILE * f, long offset, int whence);
void myFwrite(const void * dat, long size, FILE * f);
void finalize(Page * p);
#endif /* __PAGE_FILE_H */

View file

@ -25,7 +25,7 @@ int xidCount = 0;
pthread_mutex_t transactional_2_mutex;
#define INVALID_XTABLE_XID -1
#define PENDING_XTABLE_XID -2
/** Needed for debugging -- sometimes we don't want to run all of Tinit() */
void setupOperationsTable() {
@ -65,8 +65,10 @@ int Tbegin() {
pthread_mutex_lock(&transactional_2_mutex);
if( numActiveXactions == MAX_TRANSACTIONS )
if( numActiveXactions == MAX_TRANSACTIONS ) {
pthread_mutex_unlock(&transactional_2_mutex);
return EXCEED_MAX_TRANSACTIONS;
}
else
numActiveXactions++;
@ -79,14 +81,15 @@ int Tbegin() {
}
xidCount_tmp = xidCount;
/** @todo Don't want to block while we're logging... */
assert( i < MAX_TRANSACTIONS );
XactionTable[index] = LogTransBegin(xidCount_tmp);
XactionTable[index].xid = PENDING_XTABLE_XID;
pthread_mutex_unlock(&transactional_2_mutex);
XactionTable[index] = LogTransBegin(xidCount_tmp);
return XactionTable[index].xid;
}

View file

@ -6,6 +6,6 @@ else
TESTS =
endif
noinst_PROGRAMS = $(TESTS)
LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a -lefence
LDADD = @CHECK_LIBS@ $(top_builddir)/src/lladd/liblladd.a $(top_builddir)/src/pbl/libpbl.a $(top_builddir)/src/libdfa/librw.a #-lefence
CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log check_bufferManager.log
AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -47,8 +47,8 @@ terms specified in this license.
#include <lladd/transactional.h>
#include "../check_includes.h"
#define LOG_NAME "check_transactional2.log"
#define THREAD_COUNT 10
#define RECORDS_PER_THREAD 500
#define THREAD_COUNT 5
#define RECORDS_PER_THREAD 5000
void arraySet(int * array, int val) {
int i;
@ -433,17 +433,6 @@ START_TEST(transactional_blobs_threads_abort) {
}
Tdeinit();
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
sleep (10000000);
} END_TEST
/**
@ -455,14 +444,14 @@ Suite * check_suite(void) {
TCase *tc = tcase_create("transactional_smokeTest");
/* Sub tests are added, one per line, here */
tcase_add_test(tc, transactional_smokeTest);
tcase_add_test(tc, transactional_blobSmokeTest);
tcase_add_test(tc, transactional_nothreads_commit);
tcase_add_test(tc, transactional_threads_commit);
tcase_add_test(tc, transactional_nothreads_abort);
/* tcase_add_test(tc, transactional_smokeTest);
tcase_add_test(tc, transactional_blobSmokeTest); */
/* tcase_add_test(tc, transactional_nothreads_commit);
tcase_add_test(tc, transactional_threads_commit); */
/* tcase_add_test(tc, transactional_nothreads_abort); */
tcase_add_test(tc, transactional_threads_abort);
tcase_add_test(tc, transactional_blobs_nothreads_abort);
tcase_add_test(tc, transactional_blobs_threads_abort);
/* tcase_add_test(tc, transactional_blobs_threads_abort); */
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);
suite_add_tcase(s, tc);