diff --git a/src/stasis/operations.c b/src/stasis/operations.c index eaf2fe0..7ae29d9 100644 --- a/src/stasis/operations.c +++ b/src/stasis/operations.c @@ -90,7 +90,9 @@ void stasis_operation_table_init() { stasis_operation_impl_register(stasis_op_impl_noop()); stasis_operation_impl_register(stasis_op_impl_array_list_header_init()); + stasis_operation_impl_register(stasis_op_impl_page_initialize()); + stasis_operation_impl_register(stasis_op_impl_multipage_initialize()); stasis_operation_impl_register(stasis_op_impl_set_range()); stasis_operation_impl_register(stasis_op_impl_set_range_inverse()); diff --git a/src/stasis/operations/arrayList.c b/src/stasis/operations/arrayList.c index 6184b12..481aa20 100644 --- a/src/stasis/operations/arrayList.c +++ b/src/stasis/operations/arrayList.c @@ -140,7 +140,6 @@ recordid stasis_array_list_dereference_recordid(int xid, Page * p, int offset) { } /*----------------------------------------------------------------------------*/ - recordid TarrayListAlloc(int xid, pageid_t count, int multiplier, int recordSize) { pageid_t firstPage; @@ -162,9 +161,14 @@ recordid TarrayListAlloc(int xid, pageid_t count, int multiplier, int recordSize rid.slot = 0; /* number of slots in array (maxOffset + 1) */ rid.size = recordSize; Tupdate(xid, firstPage, &alp, sizeof(alp), OPERATION_ARRAY_LIST_HEADER_INIT); +#ifdef ARRAY_LIST_OLD_ALLOC for(pageid_t i = 0; i < count; i++) { TinitializeFixedPage(xid, firstPage+1+i, alp.size); } +#else + TinitializeFixedPageRange(xid, firstPage+1, count, alp.size); +#endif + return rid; } @@ -233,15 +237,17 @@ compensated_function int TarrayListExtend(int xid, recordid rid, int slots) { DEBUG("block %lld %lld %lld\n", (long long)i, (long long)newFirstPage, (long long)blockSize); tmp.slot = i + FIRST_DATA_PAGE_OFFSET; /* Iterate over the (large number) of new blocks, clearing their contents */ - /* @todo XXX arraylist generates N log entries initing pages. - It should generate 1 entry. (Need better LSN handling first.)*/ +#ifdef ARRRAY_LIST_OLD_ALLOC + // old way { for(pageid_t i = newFirstPage; i < newFirstPage + blockSize; i++) { TinitializeFixedPage(xid, i, alp.size); } } +#else + TinitializeFixedPageRange(xid, newFirstPage, blockSize, alp.size); +#endif TsetRaw(xid,tmp,&newFirstPage); - DEBUG("Tset: {%d, %d, %d} = %d\n", tmp.page, tmp.slot, tmp.size, newFirstPage); } diff --git a/src/stasis/operations/pageOperations.c b/src/stasis/operations/pageOperations.c index 277248f..a55b305 100644 --- a/src/stasis/operations/pageOperations.c +++ b/src/stasis/operations/pageOperations.c @@ -217,6 +217,52 @@ static int op_initialize_page(const LogEntry* e, Page* p) { return 0; } +typedef struct { + pageid_t firstPage; + pageid_t numPages; + pageid_t recordSize; // *has* to be smaller than a page; passed into TinitializeFixedPage() +} init_multipage_arg; + +static int op_init_multipage_impl(const LogEntry *e, Page *ignored) { + const init_multipage_arg* arg = stasis_log_entry_update_args_cptr(e); + + for(pageid_t i = 0; i < arg->numPages; i++) { + Page * p = loadPage(e->xid, arg->firstPage + i); + if(stasis_operation_multi_should_apply(e, p)) { + writelock(p->rwlatch, 0); + if(arg->recordSize) { + stasis_fixed_initialize_page(p, arg->recordSize, + stasis_fixed_records_per_page + (stasis_record_type_to_size(arg->recordSize))); + } else { + stasis_page_slotted_initialize_page(p); + } + stasis_page_lsn_write(e->xid, p, e->LSN); + unlock(p->rwlatch); + } + releasePage(p); + } + return 0; +} +int TinitializeSlottedPageRange(int xid, pageid_t start, pageid_t count) { + init_multipage_arg arg; + arg.firstPage = start; + arg.numPages = count; + arg.recordSize = 0; + + Tupdate(xid, MULTI_PAGEID, &arg, sizeof(arg), OPERATION_INITIALIZE_MULTIPAGE); + return 0; +} +int TinitializeFixedPageRange(int xid, pageid_t start, pageid_t count, size_t size) { + init_multipage_arg arg; + arg.firstPage = start; + arg.numPages = count; + arg.recordSize = size; + + Tupdate(xid, MULTI_PAGEID, &arg, sizeof(arg), OPERATION_INITIALIZE_MULTIPAGE); + return 0; +} + stasis_operation_impl stasis_op_impl_page_initialize() { stasis_operation_impl o = { OPERATION_INITIALIZE_PAGE, @@ -227,3 +273,13 @@ stasis_operation_impl stasis_op_impl_page_initialize() { }; return o; } +stasis_operation_impl stasis_op_impl_multipage_initialize() { + stasis_operation_impl o = { + OPERATION_INITIALIZE_MULTIPAGE, + MULTI_PAGE, + OPERATION_INITIALIZE_MULTIPAGE, + OPERATION_NOOP, + op_init_multipage_impl + }; + return o; +} diff --git a/stasis/constants.h b/stasis/constants.h index f77fab7..daf1ec0 100644 --- a/stasis/constants.h +++ b/stasis/constants.h @@ -140,11 +140,11 @@ terms specified in this license. // 18 #define OPERATION_NOOP 19 - -// 20 - +//20 #define OPERATION_ARRAY_LIST_HEADER_INIT 21 + #define OPERATION_INITIALIZE_PAGE 22 +#define OPERATION_INITIALIZE_MULTIPAGE 23 #define OPERATION_SET_RANGE 27 #define OPERATION_SET_RANGE_INVERSE 28 diff --git a/stasis/operations/pageOperations.h b/stasis/operations/pageOperations.h index efd9121..657801e 100644 --- a/stasis/operations/pageOperations.h +++ b/stasis/operations/pageOperations.h @@ -71,12 +71,27 @@ int TpageGet(int xid, pageid_t page, void* buf); void TinitializeSlottedPage(int xid, pageid_t page); void TinitializeFixedPage(int xid, pageid_t page, int slotLength); +/** + * Format a contiguous range of pages as slotted pages. This method writes a single, constant + * length log entry that does not contain the preimages of the pages in question. + * Therefore, it is only safe to call this function against newly allocated pages + * with uninteresting contents (such as those returned by TregionAlloc(). + * + * @see TinitializeFixedPageRange for an analogous function that initializes "fixed" pages. + */ +int TinitializeSlottedPageRange(int xid, pageid_t start, pageid_t count); +/** + * @see TinitializeSlottedPageRange for an analogous function, and a description of + * this function's non-standard impacts upon recovery. + */ +int TinitializeFixedPageRange(int xid, pageid_t start, pageid_t count, size_t size); int TpageGetType(int xid, pageid_t page); stasis_operation_impl stasis_op_impl_page_set_range(); stasis_operation_impl stasis_op_impl_page_set_range_inverse(); stasis_operation_impl stasis_op_impl_page_initialize(); +stasis_operation_impl stasis_op_impl_multipage_initialize(); stasis_operation_impl stasis_op_impl_fixed_page_alloc();