Initial checkin of relational algebra implementation.

This commit is contained in:
Sears Russell 2008-03-02 23:43:38 +00:00
parent bb1ef5e760
commit 54055cd3e7
10 changed files with 1285 additions and 1 deletions

View file

@ -225,6 +225,7 @@ AC_CONFIG_FILES([Makefile
src/apps/cht/Makefile src/apps/cht/Makefile
src/apps/readOnlyHash/Makefile src/apps/readOnlyHash/Makefile
src/apps/cyrus/Makefile src/apps/cyrus/Makefile
src/apps/referential/Makefile
src/libdfa/Makefile src/libdfa/Makefile
src/stasis/Makefile src/stasis/Makefile
src/pobj/Makefile src/pobj/Makefile

View file

@ -1 +1 @@
SUBDIRS = cht readOnlyHash #cyrus SUBDIRS = cht readOnlyHash referential #cyrus

View file

@ -0,0 +1,5 @@
ADD_LIBRARY(referential algebra.c dml.c)
SET(COMMON_LIBRARIES referential ${COMMON_LIBRARIES})
IF(CHECK_LIBRARY)
CREATE_CHECK_OPT(toplevel ${CMAKE_CURRENT_SOURCE_DIR}/test-script.ref)
ENDIF(CHECK_LIBRARY)

View file

@ -0,0 +1,4 @@
LDADD=$(top_builddir)/src/stasis/libstasis.la $(top_builddir)/src/libdfa/librw.la
toplevel_SOURCES=toplevel.c algebra.c dml.c
noinst_PROGRAMS=toplevel
AM_CFLAGS=-g -Wall -pedantic -std=gnu99

View file

@ -0,0 +1,802 @@
#define _GNU_SOURCE
#include <stdio.h>
#include <stasis/transactional.h>
#include <string.h>
#include <ctype.h>
#include "algebra.h"
#include <stdlib.h>
#include <errno.h>
#define SELECT_ITERATOR (USER_DEFINED_ITERATOR+1)
#define PROJECT_ITERATOR (USER_DEFINED_ITERATOR+2)
#define KEYVAL_ITERATOR (USER_DEFINED_ITERATOR+3)
#define JOIN_ITERATOR (USER_DEFINED_ITERATOR+4)
/* static void ts_close(int xid, void * it) {
}
static int ts_next(int xid, void * it) {
}
static int ts_tryNext(int xid, void * it) {
}
static int ts_key(int xid, void * it, byte ** key) {
}
static int ts_value(int xid, void * it, byte ** val) {
}
static void ts_tupleDone(int xid, void * it) {
}
static void ts_releaseLock(int xid, void *it) {
} */
/*static const lladdIterator_def_t ts_it = {
ts_close, ts_next, ts_tnext, ts_key, ts_value, ts_tupleDone, noopTupDone
}; */
char ** split(char * in, char ** freeme, int* count, char * delim) {
*freeme = strdup(in);
*count = 0;
char * tok = strtok(*freeme, delim);
char ** ret = 0;
while(tok) {
(*count)++;
ret = realloc(ret, sizeof(char*) * *count);
ret[(*count)-1] = tok;
tok = strtok(NULL,delim);
}
ret = realloc(ret, sizeof(char*) * ((*count)+1));
ret[*count]=0;
return ret;
}
char * tplRid(recordid rid) {
char * ret;
asprintf(&ret, "%d,%d,%lld", rid.page, rid.slot, (long long)rid.size);
return ret;
}
recordid ridTpl(char * tpl) {
int count;
char * freeme;
char ** tok = split(tpl, &freeme, &count, ", ");
recordid ret;
errno = 0;
assert(count == 3);
ret.page = strtol(tok[0],NULL,10);
ret.slot = strtol(tok[1],NULL,10);
ret.size = strtol(tok[2],NULL,10);
if(errno) {
perror("Couldn't parse rid");
abort();
}
free(freeme);
free(tok);
return ret;
}
char ** tplDup(char ** tup) {
int i = 0;
for(; tup[i]; i++) { } // i = num non-null entries
char ** ret = malloc(sizeof(char**) * (i+1));
ret[i] = 0;
while(i) {
i--;
ret[i] = strdup(tup[i]);
}
return ret;
}
void tplFree(char ** tup) {
for(int i = 0; tup[i]; i++) {
free(tup[i]);
}
free(tup);
}
lladdIterator_t* ReferentialAlgebra_OpenTableScanner(int xid, recordid catalog,
char * tablename) {
char * table;
size_t sz = ThashLookup(xid, catalog, (byte*)tablename, strlen(tablename)+1, (byte**)&table);
if(sz == -1) {
printf("Unknown table %s\n", tablename);
return 0;
}
assert(sz == strlen(table)+1);
recordid tableRid = ridTpl(table);
lladdIterator_t * it = ThashGenericIterator(xid, tableRid);
free(table);
return it;
}
typedef struct select_predicate {
char ** vals;
int count;
void * freeme;
} select_predicate;
typedef struct select_impl {
lladdIterator_t * it;
char ** p;
} select_impl;
//////////////////////////////////////////////////////////////////////////////////
/// ///
/// KEY VALUE TABLE FORMAT ///
/// ///
//////////////////////////////////////////////////////////////////////////////////
typedef struct kv_impl {
char * catted;
lladdIterator_t * it;
} kv_impl;
lladdIterator_t* ReferentialAlgebra_KeyValCat(int xid, lladdIterator_t * it) {
kv_impl * kv = malloc(sizeof(kv_impl));
kv->catted = 0;
kv->it = it;
lladdIterator_t * new_it = malloc(sizeof(lladdIterator_t));
new_it->type = KEYVAL_ITERATOR;
new_it->impl = kv;
return new_it;
}
static void kv_close(int xid, void * it) {
kv_impl * kv = it;
Titerator_close(xid, kv->it);
if(kv->catted) { free(kv->catted); kv->catted = 0; }
free(kv);
}
static int kv_next(int xid, void * it) {
kv_impl * kv = it;
if(kv->catted) { free(kv->catted); kv->catted = 0; }
return Titerator_next(xid, kv->it);
}
static int kv_tryNext(int xid, void * it) {
kv_impl * kv = it;
if(kv->catted) { free(kv->catted); kv->catted = 0; }
return Titerator_tryNext(xid, kv->it);
}
static int mkCatted(int xid, kv_impl * kv) {
char * key = 0;
char * val = 0;
Titerator_key(xid, kv->it, (byte**)&key);
Titerator_value(xid, kv->it, (byte**)&val);
if(!strlen(val)) {
kv->catted = strdup(key);
} else {
kv->catted = malloc(strlen(key) + 1 + strlen(val) + 1);
kv->catted[0]=0;
strcat(strcat(strcat(kv->catted,key),","),val);
}
return(strlen(kv->catted));
}
static int kv_key(int xid, void * it, byte ** key) {
kv_impl * kv = it;
if(!kv->catted) {
int ret = mkCatted(xid, kv);
*key = (byte*)kv->catted;
return ret;
} else {
*key = (byte*)kv->catted;
return(strlen(kv->catted)+1);
}
}
static int kv_value(int xid, void * it, byte ** val) {
kv_impl * kv = it;
if(!kv->catted) {
int ret = mkCatted(xid, kv);
*val = (byte*)kv->catted;
return ret;
} else {
*val = (byte*)kv->catted;
return strlen(kv->catted)+1;
}
}
static void kv_tupleDone(int xid, void * it) {
kv_impl * kv = it;
Titerator_tupleDone(xid, kv->it);
}
static void kv_releaseLock(int xid, void *it) {
kv_impl * kv = it;
Titerator_releaseLock(xid, kv->it);
}
//////////////////////////////////////////////////////////////////////////////////
/// ///
/// SELECT ///
/// ///
//////////////////////////////////////////////////////////////////////////////////
lladdIterator_t* ReferentialAlgebra_Select(int xid, lladdIterator_t * it, char ** pred) {
if(!it) {
tplFree(pred);
return 0;
}
select_impl * s = malloc(sizeof(select_impl));
s->p = pred;
s->it = it;
lladdIterator_t * new_it = malloc(sizeof(lladdIterator_t));
new_it->type = SELECT_ITERATOR;
new_it->impl = s;
return new_it;
}
static int matchPredicate(const char const * tup, char ** pred) {
char * tupcpy = strdup(tup);
int colcount = 0;
int predcount = 0;
while(pred[predcount]) {predcount++;}
char ** tok = malloc((predcount+1) * sizeof(char**));
char * ths;
const char const * DELIM = ", ";
if((ths = strtok(tupcpy, DELIM))) {
colcount++;
if(colcount > predcount) {
free(tupcpy);
free(tok);
return 0;
} else {
tok[colcount-1] = ths;
}
}
while((ths = strtok(NULL, DELIM))) {
colcount++;
if(colcount > predcount) {
free(tupcpy);
free(tok);
return 0;
} else {
tok[colcount-1] = ths;
}
}
int match = 0;
if(colcount == predcount) {
match=1;
for(int i = 0; i < predcount; i++) {
if(strcmp(pred[i],"*") && strcmp(pred[i], tok[i])) {
match = 0;
break;
}
}
}
free(tupcpy);
free(tok);
return match;
}
static void s_close(int xid, void * it) {
select_impl * impl = it;
Titerator_close(xid, impl->it);
tplFree(impl->p);
free(impl);
}
static int s_next(int xid, void * it) {
select_impl * impl = it;
while(Titerator_next(xid,impl->it)) {
char * val;
Titerator_value(xid, impl->it, (byte**)&val);
// printf("matching w/ %s\n", val);
if(matchPredicate(val, impl->p)) {
return 1;
} else {
Titerator_tupleDone(xid, impl->it);
}
}
return 0;
}
static int s_tryNext(int xid, void * it) {
abort();
}
static int s_key(int xid, void * it, byte ** key) {
return Titerator_key(xid, ((select_impl*)it)->it, key);
}
static int s_value(int xid, void * it, byte ** val) {
return Titerator_value(xid, ((select_impl*)it)->it, val);
}
static void s_tupleDone(int xid, void * it) {
Titerator_tupleDone(xid, ((select_impl*)it)->it);
}
static void s_releaseLock(int xid, void *it) {
Titerator_releaseLock(xid, ((select_impl*)it)->it);
}
//////////////////////////////////////////////////////////////////////////////////
/// ///
/// PROJECT ///
/// ///
//////////////////////////////////////////////////////////////////////////////////
typedef struct project_impl {
int count;
short * cols;
lladdIterator_t * it;
char * val;
} project_impl;
lladdIterator_t* ReferentialAlgebra_Project(int xid, lladdIterator_t * it, char ** project) {
if(!it) {
if(project) {
tplFree(project);
}
return 0;
}
project_impl * p = malloc(sizeof(project_impl));
int projectcount = 0;
while(project[projectcount]){projectcount++;}
p->count = projectcount;
p->cols = malloc(sizeof(short) * projectcount);
p->it = it;
p->val = 0;
for(int i = 0; project[i]; i++) {
errno = 0;
char * eos;
long col = strtol(project[i], &eos, 10);
if(*eos!='\0' || ((col == LONG_MIN || col == LONG_MAX) && errno)) {
printf("Couldn't parse column descriptor\n");
tplFree(project);
free(p->cols);
free(p);
return 0;
} else {
p->cols[i] = col;
}
}
lladdIterator_t * new_it = malloc(sizeof(lladdIterator_t));
new_it->type = PROJECT_ITERATOR;
new_it->impl = p;
tplFree(project);
return new_it;
}
static void p_close(int xid, void * it) {
project_impl * impl = it;
Titerator_close(xid, impl->it);
free(impl->cols);
free(impl);
}
static int p_next(int xid, void * it) {
project_impl * impl = it;
return Titerator_next(xid, impl->it);
}
static int p_tryNext(int xid, void * it) {
project_impl * impl = it;
return Titerator_tryNext(xid, impl->it);
}
static int p_key(int xid, void * it, byte ** key) {
project_impl * impl = it;
return Titerator_key(xid,impl->it,key);
}
static int p_value(int xid, void * it, byte ** val) {
project_impl * impl = it;
byte * in_val;
if(impl->val) {
*val = (byte*)impl->val;
return 1;
}
int ret = Titerator_value(xid,impl->it,&in_val);
if(ret) {
char * freeme;
int count;
char ** tok = split((char*)in_val, &freeme, &count, ", ");
*val = malloc(sizeof(char));
(*val)[0] = '\0';
for(int i = 0; i < impl->count; i++) {
if(impl->cols[i] < count) {
if(i) {
(*val) = realloc((*val), strlen((char*)(*val)) + 1 + strlen(tok[impl->cols[i]]) + 1);
(*val) = (byte*)strcat(strcat((char*)(*val), ","), tok[impl->cols[i]]);
} else {
(*val) = realloc((*val), strlen((char*)(*val)) + strlen(tok[impl->cols[i]]) + 1);
(*val) = (byte*)strcat((char*)(*val), tok[impl->cols[i]]);
}
} else {
printf("Tuple is too short for pattern.\n");
}
}
free(freeme);
free(tok);
}
impl->val = (char*)*val;
return ret;
}
static void p_tupleDone(int xid, void * it) {
project_impl * impl = it;
if(impl->val) { free(impl->val); impl->val = 0; }
Titerator_tupleDone(xid,impl->it);
}
static void p_releaseLock(int xid, void *it) {
project_impl * impl = it;
Titerator_releaseLock(xid,impl->it);
}
//////////////////////////////////////////////////////////////////////////////////
/// ///
/// JOIN (naive nested loops) ///
/// ///
//////////////////////////////////////////////////////////////////////////////////
typedef struct join_impl {
char *** inner_tpls;
char ** inner_strs;
char ** freethese;
lladdIterator_t * outer_it;
lladdIterator_t * inner_it;
char ** pred;
int inner_pos;
char ** outer_tpl;
char * outer_str;
char * freeouter;
} join_impl;
static int matchComparator(char ** tup1,
char ** tup2,
char ** pred) {
int match = 1;
int col = 0;
while(pred[col] && match) {
char * lhs_start = pred[col];
char * lhs_end = lhs_start;
while(isalnum(*lhs_end)) { lhs_end++; }
int lhs_len = lhs_end - lhs_start;
char * lhs = calloc(lhs_len+1,sizeof(char));
memcpy(lhs, lhs_start, lhs_len);
char * op_start = lhs_end;
while(isblank(*op_start)) { op_start++; }
char * op_end = op_start;
while(!(isblank(*op_end) || isalnum(*op_end))) { op_end++; }
int op_len = op_end - op_start;
char * op = calloc(op_len+1,sizeof(char));
memcpy(op, op_start, op_len);
char * rhs_start = op_end;
while(isblank(*rhs_start)) { rhs_start++; }
char * rhs_end = rhs_start;
while(isalnum(*rhs_end)) { rhs_end++; }
int rhs_len = rhs_end - rhs_start;
char * rhs = calloc(rhs_len+1,sizeof(char));
memcpy(rhs, rhs_start, rhs_len);
long col1 = strtol(lhs, NULL, 10);
long col2 = strtol(rhs, NULL, 10);
int colcount1 = 0;
int colcount2 = 0;
while(tup1[colcount1]) { colcount1++; }
while(tup2[colcount2]) { colcount2++; }
if(colcount1 <= col1 || colcount2 <= col2) {
printf("not enough columns for join!\n");
match = 0;
} else if(!strcmp(op,"=")) {
if(strcmp(tup1[col1], tup2[col2])) {
match = 0;
}
}
col++;
free(rhs);
free(lhs);
free(op);
}
return match;
}
lladdIterator_t* ReferentialAlgebra_Join(int xid,
char ** pred,
lladdIterator_t * outer_it,
lladdIterator_t * inner_it) {
if(!(outer_it && inner_it)) {
if(pred) { tplFree(pred); }
return 0;
}
join_impl * j = malloc(sizeof(join_impl));
j->inner_it = inner_it;
j->outer_it = outer_it;
j->pred = pred;
j->inner_tpls = calloc(1, sizeof(char ***));
j->inner_strs = calloc(1, sizeof(char **));
j->freethese = malloc(sizeof(char**));
int i = 0;
while(Titerator_next(xid, inner_it)) {
char * in_val;
Titerator_value(xid, inner_it, (byte**)&in_val);
int count;
char ** tok = split((char*)in_val, (j->freethese)+i, &count, ", ");
j->inner_tpls = realloc(j->inner_tpls, sizeof(char***)*(i+2));
j->inner_strs = realloc(j->inner_strs, sizeof(char**)*(i+2));
j->freethese = realloc(j->freethese, sizeof(char**)*(i+2));
j->inner_tpls[i] = tok;
j->inner_tpls[i+1] = 0;
j->freethese[i+1] = 0;
j->inner_strs[i] = strdup(in_val);
j->inner_strs[i+1] = 0;
Titerator_tupleDone(xid, inner_it);
i++;
}
j->inner_pos = 0;
j->outer_tpl = 0;
lladdIterator_t * new_it = malloc(sizeof(lladdIterator_t));
new_it->type = JOIN_ITERATOR;
new_it->impl = j;
return ReferentialAlgebra_KeyValCat(xid,new_it);
}
static void j_close(int xid, void * it) {
join_impl * j = it;
Titerator_close(xid,j->outer_it);
Titerator_close(xid,j->inner_it);
for(int i = 0; j->inner_tpls[i]; i++) {
// tplFree(j->inner_tpls[i]);
free(j->freethese[i]);
free(j->inner_strs[i]);
free(j->inner_tpls[i]);
}
tplFree(j->pred);
free(j->inner_tpls);
free(j->inner_strs);
free(j->freethese);
if(j->freeouter) { free(j->freeouter); }
// don't free pred; that's the caller's problem.
free(j);
}
static int j_next(int xid, void * it) {
join_impl * j = it;
while(1) {
if((!j->inner_tpls[j->inner_pos]) || (!j->outer_tpl)) {
j->inner_pos = 0;
Titerator_tupleDone(xid, j->outer_it);
if(Titerator_next(xid, j->outer_it)) {
int count;
Titerator_value(xid, j->outer_it, (byte**)&j->outer_str);
j->outer_tpl = split((char*)j->outer_str, &j->freeouter, &count, ", ");
} else {
return 0;
}
}
if(matchComparator(j->outer_tpl, j->inner_tpls[j->inner_pos], j->pred)) {
j->inner_pos++;
return 1;
} else {
j->inner_pos++;
}
}
}
static int j_tryNext(int xid, void * it) {
abort();
}
static int j_key(int xid, void * it, byte ** key) {
join_impl * j = it;
*key = (byte*)j->outer_str;
return 1;
}
static int j_value(int xid, void * it, byte ** val) {
join_impl * j = it;
*val = (byte*)j->inner_strs[j->inner_pos-1];
return 1;
}
static void j_tupleDone(int xid, void * it) {
join_impl * j = it;
free(j->outer_tpl);
free(j->freeouter);
j->outer_tpl = 0;
j->freeouter = 0;
}
static void j_releaseLock(int xid, void *it) {
// noop
}
//////////////////////////////////////////////////////////////////////////////////
/// ///
/// ININTIALIZATION ///
/// ///
//////////////////////////////////////////////////////////////////////////////////
void ReferentialAlgebra_init() {
lladdIterator_def_t select_def = {
s_close, s_next, s_tryNext, s_key, s_value, s_tupleDone, s_releaseLock
};
lladdIterator_register(SELECT_ITERATOR, select_def);
lladdIterator_def_t project_def = {
p_close, p_next, p_tryNext, p_key, p_value, p_tupleDone, p_releaseLock
};
lladdIterator_register(PROJECT_ITERATOR, project_def);
lladdIterator_def_t keyval_def = {
kv_close, kv_next, kv_tryNext, kv_key, kv_value, kv_tupleDone, kv_releaseLock
};
lladdIterator_register(KEYVAL_ITERATOR, keyval_def);
lladdIterator_def_t j_def = {
j_close, j_next, j_tryNext, j_key, j_value, j_tupleDone, j_releaseLock
};
lladdIterator_register(JOIN_ITERATOR, j_def);
}
//////////////////////////////////////////////////////////////////////////////////
/// ///
/// PARSER ///
/// ///
//////////////////////////////////////////////////////////////////////////////////
// Reserved characters: (, ), [, ], ",", " ".
// Grammar:
// E = (s [tuple] E) | (p [tuple] E) | (j [tuple] E E) | TABLENAME
// tuple = V | V,tuple
// V = string of non-reserved characters
// TABLENAME = string of non-reserved characters
#define LPAREN '{'
#define RPAREN '}'
#define LBRACKET '('
#define RBRACKET ')'
#define COMMA ','
#define SPACE ' '
#define STRING 's'
#define EOS '\0'
/*
@return one of the above. If returns STRING, set *tok to be the new
token. (*tok should be freed by caller in this case)
*/
int nextToken(char ** head, char ** tok);
char** parseTuple(char ** head) {
char **tok = calloc(1,sizeof(char*));;
char * mytok;
char ret = nextToken(head, &mytok);
assert(ret == LBRACKET);
int count = 0;
while(1) {
ret = nextToken(head, &mytok);
if(ret == RBRACKET) {
break;
}
assert(ret == STRING);
count++;
tok = realloc(tok, sizeof(char*)*(count+1));
tok[count] = 0;
tok[count-1] = mytok;
ret = nextToken(head, &mytok);
if(ret == STRING) { free(mytok); }
if(ret == RBRACKET) {
break;
}
if(ret != COMMA) {
tplFree(tok);
return 0;
}
}
return tok;
}
lladdIterator_t * parseExpression(int xid, recordid catalog,
char **head) {
while(isblank(**head)) { (*head)++; }
if(**head == LPAREN) {
(*head)++;
lladdIterator_t * it;
if(**head == 's') {
(*head)++;
char ** pred = parseTuple(head);
lladdIterator_t * it2 = parseExpression(xid, catalog, head);
it = ReferentialAlgebra_Select(xid, it2, pred);
if(it2 && !it) {
Titerator_close(xid,it2);
}
} else if(**head == 'p') {
(*head)++;
char ** pred = parseTuple(head);
lladdIterator_t * it2 = parseExpression(xid, catalog, head);
it = ReferentialAlgebra_Project(xid, it2, pred);
if(it2 && !it) {
Titerator_close(xid,it2);
}
} else if(**head == 'j') {
(*head)++;
char ** pred = parseTuple(head);
lladdIterator_t * outer = parseExpression(xid, catalog, head);
lladdIterator_t * inner = parseExpression(xid, catalog, head);
it = ReferentialAlgebra_Join(xid, pred, outer, inner);
if(outer && !it) {
Titerator_close(xid,outer);
}
if(inner && !it) {
Titerator_close(xid,inner);
}
} else {
printf("Unknown operator\n");
it = 0;
}
if(!it) {
printf("parse error\n");
return 0;
}
char * foo;
char ret = nextToken(head, &foo);
assert(ret == RPAREN);
return it;
} else {
char * tablename;
char ret = nextToken(head, &tablename);
assert(ret == STRING);
lladdIterator_t * it2 =
ReferentialAlgebra_OpenTableScanner(xid, catalog, tablename);
free(tablename);
if(!it2) { return 0; }
lladdIterator_t * it = ReferentialAlgebra_KeyValCat(xid,it2);
return it;
}
abort();
}
int nextToken(char ** head, char ** tok) {
while(isblank(**head) && **head) { (*head)++; }
switch(**head) {
case LPAREN: {
(*head)++;
return LPAREN;
} break;
case RPAREN: {
(*head)++;
return RPAREN;
} break;
case LBRACKET: {
(*head)++;
return LBRACKET;
} break;
case RBRACKET: {
(*head)++;
return RBRACKET;
} break;
case COMMA: {
(*head)++;
return COMMA;
} break;
case SPACE: {
(*head)++;
return SPACE;
} break;
default: {
if(!**head) { return EOS; };
char * first = *head;
while(isalnum(**head)||**head=='*'||**head=='=') { (*head)++; }
char * last = *head;
*tok = calloc(1 + last - first, sizeof(char));
strncpy(*tok, first, last - first); // The remaining byte is the null terminator.
return STRING;
} break;
}
}
char ** executeQuery(int xid, recordid hash, char * line) {
char * lineptr = line;
lladdIterator_t * it = parseExpression(xid,hash,&lineptr);
if(it) {
char ** tuples = malloc(sizeof(char*));
int count = 0;
while(Titerator_next(xid, it)) {
count++;
tuples = realloc(tuples, sizeof(char*)*(count+1));
Titerator_value(xid,it,(byte**)(tuples+count-1));
tuples[count-1] = strdup(tuples[count-1]);
Titerator_tupleDone(xid,it);
}
Titerator_close(xid,it);
tuples[count] = 0;
return tuples;
} else {
return 0;
}
}

View file

@ -0,0 +1,19 @@
#ifndef __ALGEBRA_H
#define __ALGEBRA_H
lladdIterator_t* ReferentialAlgebra_OpenTableScanner(int xid, recordid catalog,
char * tablename);
lladdIterator_t* ReferentialAlgebra_Select(int xid, lladdIterator_t* it,
char ** predicate);
lladdIterator_t* ReferentialAlgebra_Project(int xid, lladdIterator_t * it,
char ** project);
lladdIterator_t* ReferentialAlgebra_KeyValCat(int xid, lladdIterator_t * it);
char * tplRid(recordid rid);
recordid ridTpl(char * tpl);
void ReferentialAlgebra_init();
char ** executeQuery(int xid, recordid hash, char * line);
lladdIterator_t * parseExpression(int xid, recordid catalog, char **head);
char** parseTuple(char ** head);
#endif

View file

@ -0,0 +1,61 @@
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include "dml.h"
#include "algebra.h"
int executeInsert(int xid, recordid tables, char * insert) {
char * linecopy = strdup(insert+strlen("insert"));
char * tbl = strtok(linecopy," ");
char * tup = strtok(NULL,"\r\n");
if((!tbl) || (!tup)) {
printf("parse error\n");
return 0;
}
char * tupcopy = strdup(tup);
char * key = strtok(tupcopy,",");
char * trunctup = strtok(NULL,"\r\n");
char * table;
if(!trunctup) {
trunctup = "";
}
int sz = ThashLookup(xid, tables, (byte*)tbl, strlen(tbl)+1, (byte**)&table);
if(sz == -1) {
printf("Unknown table %s\n", tbl);
return 0;
} else {
assert(sz == strlen((char*)table)+1);
recordid tableRid = ridTpl(table);
//XXX if(debug) { printf("inserted %s=%s into %s\n", key, tup, tbl); }
ThashInsert(xid, tableRid, (byte*)key, strlen(key)+1,(byte*)trunctup,strlen(trunctup)+1);
}
free(tupcopy);
free(linecopy);
return 1;
}
int executeDelete(int xid, recordid tables, char * delete) {
char * linecopy = strdup(delete+strlen("delete"));
char * tbl = strtok(linecopy," ");
char * tup = strtok(NULL,"\r\n");
if((!tbl) || (!tup)) {
printf("parse error\n");
return 0;
}
char * table;
char * tupcopy = strdup(tup);
char * key = strtok(tupcopy,",");
int sz = ThashLookup(xid, tables, (byte*)tbl, strlen(tbl)+1, (byte**)&table);
if(sz == -1) {
printf("Unknown table %s\n", tbl);
return 0;
} else {
assert(sz == strlen((char*)table)+1);
recordid tableRid = ridTpl(table);
// if(debug) {
printf("deleted ->%s<- from %s\n", key, tbl);
ThashRemove(xid, tableRid, (byte*)key, strlen(key)+1);//,(byte*)trunctup,strlen(trunctup)+1);
}
free(tupcopy);
free(linecopy);
return 1;
}

View file

@ -0,0 +1,4 @@
#include <stasis/transactional.h>
int executeDelete(int xid, recordid tables, char * delete);
int executeInsert(int xid, recordid tables, char *insert);

View file

@ -0,0 +1,26 @@
query
query TABLES
query {s (1) TABLES}
query {s (TABLES,*,*,*) TABLES}
query {s (TABLES,*,*,*,*) TABLES}
query {p (1,2,3,4) TABLES}
query {p (0,1,2,3) TABLES}
query {p (0,0,0,0) TABLES}
query {p (0,1,2,3,0,1,2,3,0) TABLES}
query {p (0,1,2,3,0,1,2,3,0) xxx}
query {p (0) xxx}
query {s (*) xxx}
query {j (1=1) xxx yyy}
query {p (a,b,c) TABLES}
query {p (0,1,2) TABLES}
query {s () TABLES}
query {p () TABLES}
query {s () {p () TABLES} }
query {j () TABLES TABLES}
query {j (0=0) TABLES TABLES}
query {j (0=0,1=1) TABLES TABLES}
query {j (0=0,1=2) TABLES TABLES}
query {j (0<0) TABLES TABLES}
query {x y z}
create foo
insert foo bar

View file

@ -0,0 +1,362 @@
#define _GNU_SOURCE
#include <sys/types.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stasis/transactional.h>
#include "algebra.h"
#include "dml.h"
#define MAX_CONN_QUEUE 10
#define THREAD_POOL 20
#define SHUTDOWN_SERVER 1
typedef struct thread_arg {
int id;
recordid hash;
} thread_arg;
pthread_mutex_t interpreter_mut = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t interpreter_cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t nextSocket_cond = PTHREAD_COND_INITIALIZER;
FILE * nextSocket = 0;
int shuttingdown = 0;
#define INTERPRETER_BUSY 1
#define INTERPRETER_IDLE 0
char interpreterStates[THREAD_POOL];
pthread_t interpreters[THREAD_POOL];
FILE * interpreterConnections[THREAD_POOL];
int openInterpreter(FILE * in, FILE * out, recordid hash);
void * interpreterThread(void * arg) {
thread_arg * a = arg;
int i = a->id;
// printf("I am thread %d\n",i);
pthread_mutex_lock(&interpreter_mut);
while(!shuttingdown) {
while(!nextSocket) {
if(shuttingdown) { break; }
pthread_cond_wait(&interpreter_cond, &interpreter_mut);
}
if(shuttingdown) { break; }
interpreterStates[i] = INTERPRETER_BUSY;
interpreterConnections[i] = nextSocket;
nextSocket = 0;
pthread_cond_signal(&nextSocket_cond);
pthread_mutex_unlock(&interpreter_mut);
// printf("Opening connection for FILE %x\n",interpreterConnections[i]);
int ret = openInterpreter(interpreterConnections[i],
interpreterConnections[i],a->hash);
pthread_mutex_lock(&interpreter_mut);
if(ret == SHUTDOWN_SERVER) {
shuttingdown = 1;
}
interpreterStates[i] = INTERPRETER_IDLE;
fclose(interpreterConnections[i]);
interpreterConnections[i] = 0;
}
pthread_mutex_unlock(&interpreter_mut);
free(a);
return 0;
}
int openInterpreter(FILE * in, FILE * out, recordid hash) {
char * line = NULL;
size_t len = 0;
int AUTOCOMMIT = 1;
int debug = 0;
int xid = -1;
size_t read;
fprintf(out, "> ");
int ret = 0;
if(!AUTOCOMMIT) { xid = Tbegin(); }
while((read = getline(&line, &len, in)) != -1) {
if(line[0] == '!') {
if(!strncmp(line+1,"debug",strlen("debug"))) {
debug = !debug;
if(debug)
fprintf(out, "Enabling debugging\n");
else
fprintf(out, "Disabling debugging\n");
} else if(!strncmp(line+1,"regions",strlen("regions"))) {
fprintf(out, "Boundary tag pages:\n");
pageid_t pid = REGION_FIRST_TAG;
boundary_tag tag;
TregionReadBoundaryTag(-1,pid,&tag);
int done = 0;
while(!done) {
fprintf(out, "\tpageid=%lld\ttype=%d\tsize=%d\n", pid, tag.allocation_manager, tag.size);
if(tag.size == UINT32_MAX) { fprintf(out, "\t[EOF]\n"); }
int err = TregionNextBoundaryTag(-1,&pid,&tag,0);
if(!err) { done = 1; }
}
} else if(!strncmp(line+1,"autocommit",strlen("autocommit"))) {
if(AUTOCOMMIT) {
// we're not in a transaction
fprintf(out, "Disabling autocommit\n");
AUTOCOMMIT = 0;
xid = Tbegin();
} else {
fprintf(out, "Enabling autocommit\n");
AUTOCOMMIT = 1;
Tcommit(xid);
}
} else if(!strncmp(line+1,"parseTuple",strlen("parseToken"))) {
char * c = line + 1 + strlen("parseToken");
char ** toks = parseTuple(&c);
for(int i = 0; toks[i]; i++) {
fprintf(out, "col %d = ->%s<-\n", i, toks[i]);
}
fprintf(out, "trailing stuff: %s", c);
} else if(!strncmp(line+1,"parseExpression",
strlen("parseExpression"))) {
char * c = line + 1 + strlen("parseExpression");
lladdIterator_t * it = parseExpression(xid, hash, &c);
it = 0;
} else if(!strncmp(line+1,"exit",strlen("exit"))) {
break;
} else if(!strncmp(line+1,"shutdown",strlen("shutdown"))) {
ret = SHUTDOWN_SERVER;
break;
}
} else {
if(!strncmp(line,"create",strlen("create"))) {
char * linecopy = strdup(line+strlen("create"));
char * tablename = strtok(linecopy, " \r\n");
size_t sz;
byte* delme;
if(AUTOCOMMIT) xid = Tbegin();
if(-1 == (sz=ThashLookup(xid, hash, (byte*)tablename,strlen(tablename)+1,&delme))) {
fprintf(out, "Creating table %s\n", tablename);
recordid newHash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH);
char * tpl = tplRid(newHash);
char * insert;
asprintf(&insert, "insert TABLES %s,%s\n", tablename, tpl);
executeInsert(xid, hash, insert);
free(insert);
free(tpl);
} else {
fprintf(out, "Table already exists: %s\n", tablename);
free(delme);
// assert(sz == sizeof(recordid));
}
if(AUTOCOMMIT) Tcommit(xid);
free(linecopy);
} else if(!strncmp(line,"query",strlen("query"))) {
char * linecopy = strdup(line+strlen("query"));
char ** result = executeQuery(xid, hash, linecopy);
int i = 0;
if(result) {
while(result[i]) {
fprintf(out, "%s\n", result[i]);
free(result[i]);
i++;
}
free(result);
} else {
fprintf(out, "query failed\n");
}
free(linecopy);
} else if(!strncmp(line,"insert",strlen("insert"))) {
if(AUTOCOMMIT) xid = Tbegin();
executeInsert(xid, hash, line);
if(AUTOCOMMIT) Tcommit(xid);
} else if(!strncmp(line,"delete",strlen("delete"))) {
if(AUTOCOMMIT) xid = Tbegin();
executeDelete(xid, hash, line);
if(AUTOCOMMIT) Tcommit(xid);
} else {
fprintf(out, "I don't understand...\n");
}
}
fprintf(out, "> ");
}
if(!AUTOCOMMIT) { Tcommit(xid); }
free(line);
fprintf(out, "\n");
return ret;
}
void startServer(char * addr, recordid hash) {
struct addrinfo hints;
struct addrinfo *result, *rp;
int sfd, s;
struct sockaddr peer_addr;
socklen_t peer_addr_len;
// ssize_t nread;
//char buf[BUF_SIZE];
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_INET; //AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* Datagram socket */
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
printf("Listening on socket\n");
s = getaddrinfo("127.0.0.1", addr, &hints, &result);
if (s != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
exit(EXIT_FAILURE);
}
/* getaddrinfo() returns a list of address structures.
Try each address until we successfully bind().
If socket(2) (or bind(2)) fails, we (close the socket
and) try the next address. */
for (rp = result; rp != NULL; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype,
rp->ai_protocol);
if (sfd == -1)
continue;
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0)
break; /* Success */
close(sfd);
}
if (rp == NULL) { /* No address succeeded */
perror("Could not bind\n");
exit(EXIT_FAILURE);
}
freeaddrinfo(result); /* No longer needed */
int err = listen(sfd, MAX_CONN_QUEUE);
if(err == -1) {
perror("Couldn't listen()");
return;
}
printf("Spawning servers.\n"); fflush(NULL);
for(int i = 0; i < THREAD_POOL; i++) {
thread_arg * arg = malloc(sizeof(thread_arg));
arg->id = i;
arg->hash = hash;
interpreterStates[i] = INTERPRETER_IDLE;
interpreterConnections[i] = 0;
pthread_create(&interpreters[i], 0, interpreterThread, arg);
}
printf("Ready for connections.\n"); fflush(NULL);
/* Read datagrams and echo them back to sender */
for (;;) {
int fd = accept(sfd, &peer_addr, &peer_addr_len);
if(fd == -1) {
perror("Error accepting connection");
} else {
FILE * sock = fdopen(fd, "w+");
pthread_mutex_lock(&interpreter_mut);
// int ret = openInterpreter(sock, sock, hash);
// fclose(sock);
// if(ret) {
while(nextSocket) {
pthread_cond_wait(&nextSocket_cond, &interpreter_mut);
}
nextSocket = sock;
pthread_cond_signal(&interpreter_cond);
pthread_mutex_unlock(&interpreter_mut);
if(shuttingdown) {
break;
}
}
}
pthread_cond_broadcast(&interpreter_cond);
for(int i = 0; i < THREAD_POOL; i++) {
pthread_join(interpreters[i],0);
}
close(sfd);
}
int main(int argc, char * argv[]) {
Tinit();
ReferentialAlgebra_init();
recordid rootEntry;
recordid hash;
int xid = Tbegin();
if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) {
printf("Creating new store\n");
rootEntry = Talloc(xid, sizeof(recordid));
assert(rootEntry.page == ROOT_RECORD.page);
assert(rootEntry.slot == ROOT_RECORD.slot);
hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH);
Tset(xid, rootEntry, &hash);
char * tpl = tplRid(hash);
ThashInsert(xid,hash,(byte*)"TABLES", strlen("TABLES")+1, (byte*)tpl, strlen(tpl)+1);
} else {
printf("Opened existing store\n");
rootEntry.page = ROOT_RECORD.page;
rootEntry.slot = ROOT_RECORD.slot;
rootEntry.size = sizeof(recordid);
Tread(xid, rootEntry, &hash);
}
Tcommit(xid);
FILE * in;
if(argc == 3) { // listen on socket
if(strcmp("--socket", argv[1])) {
printf("usage:\n\n%s\n%s <filename>\n%s --socket addr:port\n",
argv[0],argv[0],argv[0]);
} else {
startServer(argv[2], hash);
}
printf("Shutting down...\n");
} else {
if(argc == 2) {
in = fopen(argv[1], "r");
if(!in) {
printf("Couldn't open input file.\n");
return 1;
}
} else {
in = stdin;
}
openInterpreter(in, stdout, hash);
}
Tdeinit();
}