From e087c9c0735cffc577e2c3f5f84e437283d34324 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Mon, 18 Apr 2011 20:04:33 +0000 Subject: [PATCH] add alternative implementation of min aggregate for small sets. It avoids red black overheads, but some operations are linear in the number of values considered by the aggregate --- src/stasis/logger/safeWrites.c | 8 +- src/stasis/util/min.c | 115 ++++++++++++++++++-- src/stasis/util/ringbuffer.c | 12 +-- stasis/util/min.h | 8 +- test/stasis/CMakeLists.txt | 1 + test/stasis/check_min.c | 190 +++++++++++++++++++++++++++++++++ 6 files changed, 304 insertions(+), 30 deletions(-) create mode 100644 test/stasis/check_min.c diff --git a/src/stasis/logger/safeWrites.c b/src/stasis/logger/safeWrites.c index d9cdd30..5205393 100644 --- a/src/stasis/logger/safeWrites.c +++ b/src/stasis/logger/safeWrites.c @@ -818,12 +818,6 @@ static lsn_t firstLogEntry_LogWriter(stasis_log_t* log) { static void setTruncation_LogWriter(stasis_log_t* log, stasis_truncation_t *trunc) { // logwriter does not support hard limits on its size, so this is a no-op } -static int lsn_cmp(const void *ap, const void *bp, const void * ignored) { - lsn_t a = *(lsn_t*)ap; - lsn_t b = *(lsn_t*)bp; - - return (a < b) ? -1 : ((a == b) ? 0 : 1); -} stasis_log_t* stasis_log_safe_writes_open(const char * filename, int filemode, int fileperm, int softcommit) { @@ -910,7 +904,7 @@ stasis_log_t* stasis_log_safe_writes_open(const char * filename, sw->flushedLSN_wal = 0; sw->flushedLSN_commit = 0; sw->flushedLSN_internal = 0; - sw->minPending = stasis_aggregate_min_init(lsn_cmp); + sw->minPending = stasis_aggregate_min_init(0); /* Seek append only log to the end of the file. This is unnecessary, since the file was opened in append only mode, but it returns the diff --git a/src/stasis/util/min.c b/src/stasis/util/min.c index 0b1b5f9..99dfa66 100644 --- a/src/stasis/util/min.c +++ b/src/stasis/util/min.c @@ -1,27 +1,124 @@ #include #include #include +#include + +static int cmp_int64_t(const void *ap, const void *bp, const void *ign) { + int64_t a = *(int64_t*)ap; + int64_t b = *(int64_t*)bp; + + return (a < b) ? -1 : + (a > b) ? 1 : 0; +} struct stasis_aggregate_min_t { struct rbtree * tree; + int64_t ** vals; + int num_entries; + pthread_key_t key; + int64_t * memo; }; -stasis_aggregate_min_t * stasis_aggregate_min_init(int(*cmp)(const void* a, const void *b, const void *c)) { +static void free_key(void * key) { + free(key); +} + +stasis_aggregate_min_t * stasis_aggregate_min_init(int large) { stasis_aggregate_min_t * ret = malloc(sizeof(*ret)); - ret->tree = rbinit(cmp,0); + if(large) { + ret->tree = rbinit(cmp_int64_t,0); + } else { + ret->tree = 0; + ret->vals = 0; + ret->num_entries = 0; + pthread_key_create(&ret->key, free_key); + ret->memo = 0; + } return ret; } void stasis_aggregate_min_deinit(stasis_aggregate_min_t * min) { - rbdestroy(min->tree); + if(min->tree) { + rbdestroy(min->tree); + } else { + + } free(min); } -void stasis_aggregate_min_add(stasis_aggregate_min_t * min, void * a) { - rbsearch(a, min->tree); +void stasis_aggregate_min_add(stasis_aggregate_min_t * min, int64_t * a) { + if(min->tree) { + rbsearch(a, min->tree); + } else { + if(min->memo) { + if(*min->memo > *a) { + min->memo = a; + } + } + int64_t * p = pthread_getspecific(min->key); + if(!p) { + p = malloc(sizeof(int64_t)); + *p = -1; + pthread_setspecific(min->key, p); + } + + if(*p != -1 && min->vals[*p] == 0) { min->vals[*p] = a; return; } + + for(int i = 0; i < min->num_entries; i++) { + if(!min->vals[i]) { + min->vals[i] = a; + *p = i; + return; + } + } + min->num_entries++; + min->vals = realloc(min->vals, min->num_entries * sizeof(int64_t**)); + *p = min->num_entries-1; + min->vals[*p] = a; + return; + } } -const void * stasis_aggregate_min_remove(stasis_aggregate_min_t * min, void * a) { - return rbdelete(a, min->tree); +const int64_t * stasis_aggregate_min_remove(stasis_aggregate_min_t * min, int64_t * a) { + if(min->tree) { + return rbdelete(a, min->tree); + } else { + if(min->memo && *min->memo == *a) { min->memo = NULL; } + int64_t * p = pthread_getspecific(min->key); + if(p /*key defined*/) { + if(*p != -1 /*key points to slot in array*/) { + if(min->vals[*p]/*slot in array points to something*/ ) { + if(*min->vals[*p] == *a) { + min->vals[*p] = 0; // clear array entry + return a; + } + } + } + } + for(int i = 0; i < min->num_entries; i++) { + if(min->vals[i] && (*min->vals[i] == *a)) { + int64_t * ret = min->vals[i]; + min->vals[i] = 0; + return ret; + } + } + abort(); + } } -const void * stasis_aggregate_min_compute(stasis_aggregate_min_t * min) { - return rbmin(min->tree); +const int64_t * stasis_aggregate_min_compute(stasis_aggregate_min_t * min) { if(min->tree) { + return (int64_t*)rbmin(min->tree); + } else { + if(min->memo) { + return min->memo; + } else { + for(int i = 0; i < min->num_entries; i++) { + if(min->vals[i]) { + if(!min->memo) { + min->memo = min->vals[i]; + } else if(*min->vals[i] < *min->memo) { + min->memo = min->vals[i]; + } + } + } + return min->memo; + } + } } diff --git a/src/stasis/util/ringbuffer.c b/src/stasis/util/ringbuffer.c index db091ac..0e8556f 100644 --- a/src/stasis/util/ringbuffer.c +++ b/src/stasis/util/ringbuffer.c @@ -47,14 +47,6 @@ struct stasis_ringbuffer_t { pthread_cond_t write_done; }; -static int cmp_int64_t(const void *ap, const void *bp, const void *ign) { - int64_t a = *(int64_t*)ap; - int64_t b = *(int64_t*)bp; - - return (a < b) ? -1 : - (a > b) ? 1 : 0; -} - // Does not need synchronization (only called from nb function). static inline int64_t freespace(stasis_ringbuffer_t * ring) { int64_t ret = ((ring->rt - ring->wf) - 1) & ring->mask; @@ -235,8 +227,8 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, int64_t initial_offs ring->rt = ring->rf = ring->wt = ring->wf = 0; - ring->min_reader = stasis_aggregate_min_init(cmp_int64_t); - ring->min_writer = stasis_aggregate_min_init(cmp_int64_t); + ring->min_reader = stasis_aggregate_min_init(0); + ring->min_writer = stasis_aggregate_min_init(0); pthread_mutex_init(&ring->mut,0); pthread_cond_init(&ring->read_done,0); diff --git a/stasis/util/min.h b/stasis/util/min.h index 87bb02a..4d057a0 100644 --- a/stasis/util/min.h +++ b/stasis/util/min.h @@ -5,11 +5,11 @@ BEGIN_C_DECLS typedef struct stasis_aggregate_min_t stasis_aggregate_min_t; -stasis_aggregate_min_t * stasis_aggregate_min_init(int(*cmp)(const void *a, const void *b, const void *c)); +stasis_aggregate_min_t * stasis_aggregate_min_init(int large); void stasis_aggregate_min_deinit(stasis_aggregate_min_t * min); -void stasis_aggregate_min_add(stasis_aggregate_min_t * min, void * a); -const void * stasis_aggregate_min_remove(stasis_aggregate_min_t * min, void * b); -const void * stasis_aggregate_min_compute(stasis_aggregate_min_t * min); +void stasis_aggregate_min_add(stasis_aggregate_min_t * min, int64_t* a); +const int64_t * stasis_aggregate_min_remove(stasis_aggregate_min_t * min, int64_t * b); +const int64_t * stasis_aggregate_min_compute(stasis_aggregate_min_t * min); END_C_DECLS #endif diff --git a/test/stasis/CMakeLists.txt b/test/stasis/CMakeLists.txt index 209c9c5..4d379f7 100644 --- a/test/stasis/CMakeLists.txt +++ b/test/stasis/CMakeLists.txt @@ -3,6 +3,7 @@ CREATE_CHECK(check_redblack) CREATE_CHECK(check_concurrentHash) CREATE_CHECK(check_lhtable) CREATE_CHECK(check_concurrentRingbuffer) +CREATE_CHECK(check_min) CREATE_CHECK(check_logEntry) CREATE_CHECK(check_logWriter) CREATE_CHECK(check_page) diff --git a/test/stasis/check_min.c b/test/stasis/check_min.c new file mode 100644 index 0000000..203790d --- /dev/null +++ b/test/stasis/check_min.c @@ -0,0 +1,190 @@ +/*--- +This software is copyrighted by the Regents of the University of +California, and other parties. The following terms apply to all files +associated with the software unless explicitly disclaimed in +individual files. + +The authors hereby grant permission to use, copy, modify, distribute, +and license this software and its documentation for any purpose, +provided that existing copyright notices are retained in all copies +and that this notice is included verbatim in any distributions. No +written agreement, license, or royalty fee is required for any of the +authorized uses. Modifications to this software may be copyrighted by +their authors and need not follow the licensing terms described here, +provided that the new terms are clearly indicated on the first page of +each file where they apply. + +IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY +FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES +ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY +DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, +INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND +NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND +THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE +MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. + +GOVERNMENT USE: If you are acquiring this software on behalf of the +U.S. government, the Government shall have only "Restricted Rights" in +the software and related documentation as defined in the Federal +Acquisition Regulations (FARs) in Clause 52.227.19 (c) (2). If you are +acquiring the software on behalf of the Department of Defense, the +software shall be classified as "Commercial Computer Software" and the +Government shall have only "Restricted Rights" as defined in Clause +252.227-7013 (c) (1) of DFARs. Notwithstanding the foregoing, the +authors grant the U.S. Government and others acting in its behalf +permission to use and distribute the software in accordance with the +terms specified in this license. +---*/ +#include "../check_includes.h" + +#include + +#include +#include +#include + +#define LOG_NAME "check_min.log" + +/** + @test + +*/ + +START_TEST(minSmokeTest) { + stasis_aggregate_min_t * small = stasis_aggregate_min_init(0); + stasis_aggregate_min_t * large = stasis_aggregate_min_init(1); + + int64_t i1 = 1; + int64_t i2 = 2; + int64_t i3 = 3; + int64_t i4 = 4; + stasis_aggregate_min_add(small, &i2); + stasis_aggregate_min_add(large, &i2); + assert(2 == * stasis_aggregate_min_compute(large)); + assert(2 == * stasis_aggregate_min_compute(small)); + + stasis_aggregate_min_remove(small, &i2); + stasis_aggregate_min_remove(large, &i2); + assert(! stasis_aggregate_min_compute(large)); + assert(! stasis_aggregate_min_compute(small)); + + stasis_aggregate_min_add(small, &i1); + stasis_aggregate_min_add(large, &i1); + + assert(1 == * stasis_aggregate_min_compute(large)); + assert(1 == * stasis_aggregate_min_compute(small)); + + stasis_aggregate_min_add(small, &i3); + stasis_aggregate_min_add(large, &i3); + + assert(1 == * stasis_aggregate_min_compute(large)); + assert(1 == * stasis_aggregate_min_compute(small)); + + stasis_aggregate_min_remove(small, &i1); + stasis_aggregate_min_remove(large, &i1); + + assert(3 == * stasis_aggregate_min_compute(large)); + assert(3 == * stasis_aggregate_min_compute(small)); + + stasis_aggregate_min_add(small, &i1); + stasis_aggregate_min_add(large, &i1); + + assert(1 == * stasis_aggregate_min_compute(large)); + assert(1 == * stasis_aggregate_min_compute(small)); + + stasis_aggregate_min_add(small, &i2); + stasis_aggregate_min_add(large, &i2); + + assert(1 == * stasis_aggregate_min_compute(large)); + assert(1 == * stasis_aggregate_min_compute(small)); + + stasis_aggregate_min_remove(small, &i1); + stasis_aggregate_min_remove(large, &i1); + + assert(2 == * stasis_aggregate_min_compute(large)); + assert(2 == * stasis_aggregate_min_compute(small)); + +} END_TEST + +START_TEST(minRandomTest) { + stasis_aggregate_min_t * a = stasis_aggregate_min_init(0); + stasis_aggregate_min_t * b = stasis_aggregate_min_init(0); + const int COUNT = 10000; + + int64_t * vals = malloc(sizeof(int64_t) * COUNT); + int64_t * bits = malloc(sizeof(int64_t) * COUNT); + for(int i = 0; i < COUNT; i++) { + vals[i] = i; + bits[i] = 0; + } + for(int i = 0; i < COUNT; i++) { + if(! (i & 1023)) { printf("%d\n", i); } + switch(myrandom(3)) { + case 0: + { + int j; + int tries = 0; + while((j = myrandom(i))) { + if(!bits[j]) { + bits[j] = 1; + + stasis_aggregate_min_add(a, &vals[j]); + stasis_aggregate_min_add(b, &vals[j]); + + break; + } + tries ++; + if(tries == 100) break; + } + } break; + case 1: + { + int j; + int tries = 0; + while((j = myrandom(i))) { + if(bits[j]) { + bits[j] = 0; + + stasis_aggregate_min_remove(a, &vals[j]); + stasis_aggregate_min_remove(b, &vals[j]); + + break; + } + tries ++; + if(tries == 100) break; + } + } break; + case 2: + { + int64_t * ap = stasis_aggregate_min_compute(a); + int64_t * bp = stasis_aggregate_min_compute(b); + assert(ap == bp); + } break; + } + } +} END_TEST + +Suite * check_suite(void) { + Suite *s = suite_create("ringBuffer"); + /* Begin a new test */ + TCase *tc = tcase_create("ringBuffer"); + + /* Sub tests are added, one per line, here */ + + tcase_add_test(tc, minSmokeTest); + tcase_add_test(tc, minRandomTest); + + /* --------------------------------------------- */ + + tcase_add_checked_fixture(tc, setup, teardown); + + + suite_add_tcase(s, tc); + return s; +} + +#include "../check_setup.h"