in-memory group by implementation

This commit is contained in:
Sears Russell 2009-07-01 01:33:24 +00:00
parent 5efebbbb10
commit b66328192c
10 changed files with 265 additions and 3 deletions

View file

@ -24,6 +24,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c
operations/alloc.c operations/noop.c
operations/arrayList.c
operations/lsnFreeSet.c
operations/group/logStructured.c
hash.c
operations/naiveLinearHash.c
operations/linearHashNTA.c

View file

@ -23,6 +23,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c rw.c doubleLinkedList.c common
operations/pageOrientedListNTA.c \
operations/regions.c operations/lsmTree.c \
operations/lsnFreeSet.c \
operations/group/logStructured.c \
io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/non_blocking.c \
io/debug.c io/handle.c \
bufferManager.c \

View file

@ -5,6 +5,7 @@
#include <stasis/operations.h>
#include <stasis/arrayCollection.h>
#include <stasis/logger/logMemory.h>
#include <stasis/operations/group.h>
static lladdIterator_def_t iterators[MAX_ITERATOR_TYPES];
@ -42,6 +43,8 @@ void iterator_init() {
lladdFifoPool_iterator_tupleDone,
};
lladdIterator_register(POINTER_ITERATOR, pointer_def);
stasis_log_structured_group_init();
}

View file

@ -0,0 +1,124 @@
/*
* logStructured.c
*
* Created on: Jun 28, 2009
* Author: sears
*/
#include<stasis/transactional.h>
#include<stasis/lhtable.h>
#include<string.h>
typedef struct {
struct LH_ENTRY(table) * table;
recordid runs;
size_t runlen;
size_t curlen;
} stasis_log_structured_group_t;
typedef struct {
byte ** val;
size_t * vallen;
size_t count;
} stasis_log_structured_group_entry_t;
typedef struct {
struct LH_ENTRY(list) entries;
const struct LH_ENTRY(pair_t)* entry;
size_t offset;
} stasis_log_structured_group_iterator;
static int stasis_log_structured_group_put(struct stasis_group_t* impl,
byte* key, size_t keylen, byte* val, size_t vallen) {
stasis_log_structured_group_t * g = impl->impl;
byte * k = malloc(keylen);
memcpy(k, key, keylen);
byte * v = malloc(vallen);
memcpy(v, val, vallen);
g->curlen += (keylen + vallen);
if(g->curlen < g->runlen) {
stasis_log_structured_group_entry_t * entry;
if((entry = LH_ENTRY(find)(g->table, k,keylen))) {
entry->count++;
entry->val = realloc(entry->val, sizeof(entry->val[0])*entry->count);
entry->vallen = realloc(entry->vallen, sizeof(entry->vallen[0])*entry->count);
} else {
entry = malloc(sizeof(*entry));
entry->val = malloc(sizeof(entry->val[0]));
entry->vallen = malloc(sizeof(entry->vallen[0]));
LH_ENTRY(insert)(g->table, k, keylen, entry);
entry->count = 1;
}
entry->vallen[entry->count-1] = vallen;
entry->val[entry->count-1] = v;
} else {
abort();
}
return 0;
}
static lladdIterator_t* stasis_log_structured_group_done(stasis_group_t* impl) {
stasis_log_structured_group_t * g = impl->impl;
stasis_log_structured_group_iterator * it = malloc(sizeof(*it));
LH_ENTRY(openlist)(g->table, &it->entries);
it->entry = 0;
it->offset = -1;
lladdIterator_t * ret = malloc(sizeof(*it));
ret->type = STASIS_LOG_STRUCTURED_GROUP_ITERATOR;
ret->impl = it;
return ret;
}
stasis_group_t * TlogStructuredGroup(int xid, size_t runlen) {
stasis_group_t * ret = malloc(sizeof(*ret));
ret->put = stasis_log_structured_group_put;
ret->done = stasis_log_structured_group_done;
stasis_log_structured_group_t * g;
g = malloc(sizeof(*g));
g->curlen = 0;
g->runlen = runlen;
g->runs = NULLRID;
g->table = LH_ENTRY(create)(100);
ret->impl = g;
return ret;
}
static void stasis_log_structured_group_it_close(int xid, void* impl) {
stasis_log_structured_group_iterator * it = impl;
LH_ENTRY(closelist)(&it->entries);
free(it);
}
static int stasis_log_structured_group_it_next(int xid, void* impl) {
stasis_log_structured_group_iterator * it = impl;
stasis_log_structured_group_entry_t* entry = it->entry ? it->entry->value : 0;
if((!entry) || it->offset == (entry->count-1)) {
it->entry = lhreadlist(&it->entries);
it->offset =0 ;
} else {
it->offset++;
}
return 0 != it->entry;
}
static int stasis_log_structured_group_it_key(int xid, void* impl, byte** key) {
stasis_log_structured_group_iterator * it = impl;
*key = (byte*) it->entry->key; // TODO cast strips const
return it->entry->keyLength;
}
static int stasis_log_structured_group_it_value(int xid, void* impl, byte** val) {
stasis_log_structured_group_iterator * it = impl;
stasis_log_structured_group_entry_t* entry = it->entry->value;
*val = entry->val[it->offset];
return entry->vallen[it->offset];
}
static void stasis_log_structured_group_it_tupleDone(int xid, void* impl) {}
void stasis_log_structured_group_init() {
static lladdIterator_def_t def = {
stasis_log_structured_group_it_close,
stasis_log_structured_group_it_next,
stasis_log_structured_group_it_next,
stasis_log_structured_group_it_key,
stasis_log_structured_group_it_value,
stasis_log_structured_group_it_tupleDone
};
lladdIterator_register(STASIS_LOG_STRUCTURED_GROUP_ITERATOR, def);
}

View file

@ -257,6 +257,7 @@ static const short SLOT_TYPE_LENGTHS[] = { -1, -1, sizeof(blob_record_t), -1};
#define ARRAY_ITERATOR 1
#define LOG_MEMORY_ITERATOR 2
#define POINTER_ITERATOR 3
#define STASIS_LOG_STRUCTURED_GROUP_ITERATOR 4
#define USER_DEFINED_ITERATOR 10

View file

@ -149,7 +149,7 @@ typedef struct {
#include "operations/regions.h"
#include "operations/lsmTree.h"
#include "operations/lsnFreeSet.h"
#include "operations/group.h"
/**
Initialize stasis' operation table.
*/

21
stasis/operations/group.h Normal file
View file

@ -0,0 +1,21 @@
/*
* group.h
*
* Created on: Jun 30, 2009
* Author: sears
*/
#ifndef GROUP_H_
#define GROUP_H_
#include<stasis/iterator.h>
typedef struct stasis_group_t {
int (*put)(struct stasis_group_t * impl, byte* key, size_t keylen, byte* val, size_t vallen);
lladdIterator_t * (*done)(struct stasis_group_t *impl);
void* impl;
} stasis_group_t;
stasis_group_t * TlogStructuredGroup(int xid, size_t runlen);
void stasis_log_structured_group_init();
#endif /* GROUP_H_ */

View file

@ -28,4 +28,5 @@ CREATE_CHECK(check_io)
CREATE_CHECK(check_rangeTracker)
CREATE_CHECK(check_replacementPolicy)
CREATE_CHECK(check_lsmTree)
CREATE_CHECK(check_groupBy)
# check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery check_bufferManager check_indirect check_pageOperations check_linearHash check_logicalLinearHash check_header check_linkedListNTA check_linearHashNTA check_pageOrientedList check_lockManager check_compensations check_errorHandling check_ringbuffer check_iterator check_multiplexer check_bTree check_regions check_allocationPolicy check_io check_rangeTracker check_replacementPolicy check_lsmTree)

View file

@ -1,9 +1,9 @@
# INCLUDES = @CHECK_CFLAGS@
## Had to disable check_lht because lht needs to be rewritten.
TESTS = check_lhtable check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery check_bufferManager check_indirect check_pageOperations check_linearHash check_header check_linkedListNTA check_linearHashNTA check_pageOrientedList check_lockManager check_compensations check_errorHandling check_ringbuffer check_iterator check_multiplexer check_bTree check_regions check_allocationPolicy check_io check_rangeTracker check_replacementPolicy check_lsmTree
TESTS = check_lhtable check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery check_bufferManager check_indirect check_pageOperations check_linearHash check_header check_linkedListNTA check_linearHashNTA check_pageOrientedList check_lockManager check_compensations check_errorHandling check_ringbuffer check_iterator check_multiplexer check_bTree check_regions check_allocationPolicy check_io check_rangeTracker check_replacementPolicy check_lsmTree check_groupBy
noinst_PROGRAMS = $(TESTS)
LDADD = $(top_builddir)/src/stasis/libstasis.la
CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log check_bufferManager.log check_indirect.log check_bufferMananger.log check_lladdhash.log check_pageOperations.log check_linearhash.log check_linkedListNTA.log check_linearHashNTA.log check_pageOrientedListNTA.log check_lockManager.log check_compensations.log check_errorhandling.log check_header.log check_iterator.log check_linearHash.log check_ringbuffer.log check_bTree.log
CLEANFILES = check_lht.log check_logEntry.log storefile.txt logfile.txt blob0_file.txt blob1_file.txt check_blobRecovery.log check_logWriter.log check_operations.log check_recovery.log check_transactional2.log check_page.log check_bufferManager.log check_indirect.log check_bufferMananger.log check_lladdhash.log check_pageOperations.log check_linearhash.log check_linkedListNTA.log check_linearHashNTA.log check_pageOrientedListNTA.log check_lockManager.log check_compensations.log check_errorhandling.log check_header.log check_iterator.log check_linearHash.log check_ringbuffer.log check_bTree.log check_groupBy.log
AM_CFLAGS=${GLOBAL_CFLAGS}

110
test/stasis/check_groupBy.c Normal file
View file

@ -0,0 +1,110 @@
/*---
This software is copyrighted by the Regents of the University of
California, and other parties. The following terms apply to all files
associated with the software unless explicitly disclaimed in
individual files.
The authors hereby grant permission to use, copy, modify, distribute,
and license this software and its documentation for any purpose,
provided that existing copyright notices are retained in all copies
and that this notice is included verbatim in any distributions. No
written agreement, license, or royalty fee is required for any of the
authorized uses. Modifications to this software may be copyrighted by
their authors and need not follow the licensing terms described here,
provided that the new terms are clearly indicated on the first page of
each file where they apply.
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
GOVERNMENT USE: If you are acquiring this software on behalf of the
U.S. government, the Government shall have only "Restricted Rights" in
the software and related documentation as defined in the Federal
Acquisition Regulations (FARs) in Clause 52.227.19 (c) (2). If you are
acquiring the software on behalf of the Department of Defense, the
software shall be classified as "Commercial Computer Software" and the
Government shall have only "Restricted Rights" as defined in Clause
252.227-7013 (c) (1) of DFARs. Notwithstanding the foregoing, the
authors grant the U.S. Government and others acting in its behalf
permission to use and distribute the software in accordance with the
terms specified in this license.
---*/
#include "../check_includes.h"
#include <stasis/transactional.h>
#include <assert.h>
#include <limits.h>
#include <math.h>
#include <pthread.h>
#include <sys/time.h>
#include <time.h>
#define LOG_NAME "check_groupBy.log"
START_TEST(groupBySmokeTest) {
Tinit();
int xid = Tbegin();
stasis_group_t * handle = TlogStructuredGroup(xid, 1024*1024*40);
for(int i =0; i < 10000; i++) {
for(int j = 0; j < 100; j++) {
int val = i * 100 + j;
handle->put(handle, (byte*)&i, sizeof(i), (byte*)&val, sizeof(val));
}
}
Tcommit(xid);
Tdeinit();
lladdIterator_t * it = handle->done(handle);
int numGroups = 0;
int numTups = 0;
int groupSize = -1;
int oldj = -1;
while(Titerator_next(xid, it)) {
int *j;
Titerator_key(xid, it, (byte**)&j);
Titerator_tupleDone(xid, it);
if(*j == oldj) {
groupSize++;
} else {
assert(groupSize == -1 || groupSize == 100);
numGroups++;
groupSize = 1;
oldj = *j;
}
numTups++;
}
assert(numGroups == 10000);
assert(numTups = numGroups * 100);
} END_TEST
Suite * check_suite(void) {
Suite *s = suite_create("linearHashNTA");
/* Begin a new test */
TCase *tc = tcase_create("simple");
tcase_set_timeout(tc, 1200); // 20 minute timeout
/* Sub tests are added, one per line, here */
tcase_add_test(tc, groupBySmokeTest);
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);
suite_add_tcase(s, tc);
return s;
}
#include "../check_setup.h"