diff --git a/configure.in b/configure.in index eb96073..c49e3b1 100644 --- a/configure.in +++ b/configure.in @@ -225,6 +225,7 @@ AC_CONFIG_FILES([Makefile src/apps/cht/Makefile src/apps/readOnlyHash/Makefile src/apps/cyrus/Makefile + src/apps/referential/Makefile src/libdfa/Makefile src/stasis/Makefile src/pobj/Makefile diff --git a/src/apps/Makefile.am b/src/apps/Makefile.am index 06c6009..4e8d371 100644 --- a/src/apps/Makefile.am +++ b/src/apps/Makefile.am @@ -1 +1 @@ -SUBDIRS = cht readOnlyHash #cyrus +SUBDIRS = cht readOnlyHash referential #cyrus diff --git a/src/apps/referential/CMakeLists.txt b/src/apps/referential/CMakeLists.txt new file mode 100644 index 0000000..448229f --- /dev/null +++ b/src/apps/referential/CMakeLists.txt @@ -0,0 +1,5 @@ +ADD_LIBRARY(referential algebra.c dml.c) +SET(COMMON_LIBRARIES referential ${COMMON_LIBRARIES}) +IF(CHECK_LIBRARY) + CREATE_CHECK_OPT(toplevel ${CMAKE_CURRENT_SOURCE_DIR}/test-script.ref) +ENDIF(CHECK_LIBRARY) \ No newline at end of file diff --git a/src/apps/referential/Makefile.am b/src/apps/referential/Makefile.am new file mode 100644 index 0000000..7c64bd0 --- /dev/null +++ b/src/apps/referential/Makefile.am @@ -0,0 +1,4 @@ +LDADD=$(top_builddir)/src/stasis/libstasis.la $(top_builddir)/src/libdfa/librw.la +toplevel_SOURCES=toplevel.c algebra.c dml.c +noinst_PROGRAMS=toplevel +AM_CFLAGS=-g -Wall -pedantic -std=gnu99 diff --git a/src/apps/referential/algebra.c b/src/apps/referential/algebra.c new file mode 100644 index 0000000..f96b109 --- /dev/null +++ b/src/apps/referential/algebra.c @@ -0,0 +1,802 @@ +#define _GNU_SOURCE +#include +#include +#include +#include +#include "algebra.h" +#include +#include +#define SELECT_ITERATOR (USER_DEFINED_ITERATOR+1) +#define PROJECT_ITERATOR (USER_DEFINED_ITERATOR+2) +#define KEYVAL_ITERATOR (USER_DEFINED_ITERATOR+3) +#define JOIN_ITERATOR (USER_DEFINED_ITERATOR+4) + +/* static void ts_close(int xid, void * it) { + +} +static int ts_next(int xid, void * it) { + +} +static int ts_tryNext(int xid, void * it) { + +} +static int ts_key(int xid, void * it, byte ** key) { + +} +static int ts_value(int xid, void * it, byte ** val) { + +} +static void ts_tupleDone(int xid, void * it) { + +} +static void ts_releaseLock(int xid, void *it) { + +} */ +/*static const lladdIterator_def_t ts_it = { + ts_close, ts_next, ts_tnext, ts_key, ts_value, ts_tupleDone, noopTupDone + }; */ + + +char ** split(char * in, char ** freeme, int* count, char * delim) { + *freeme = strdup(in); + *count = 0; + char * tok = strtok(*freeme, delim); + char ** ret = 0; + while(tok) { + (*count)++; + ret = realloc(ret, sizeof(char*) * *count); + ret[(*count)-1] = tok; + tok = strtok(NULL,delim); + } + ret = realloc(ret, sizeof(char*) * ((*count)+1)); + ret[*count]=0; + return ret; +} + +char * tplRid(recordid rid) { + char * ret; + asprintf(&ret, "%d,%d,%lld", rid.page, rid.slot, (long long)rid.size); + return ret; +} +recordid ridTpl(char * tpl) { + int count; + char * freeme; + char ** tok = split(tpl, &freeme, &count, ", "); + recordid ret; + errno = 0; + assert(count == 3); + ret.page = strtol(tok[0],NULL,10); + ret.slot = strtol(tok[1],NULL,10); + ret.size = strtol(tok[2],NULL,10); + if(errno) { + perror("Couldn't parse rid"); + abort(); + } + free(freeme); + free(tok); + return ret; +} + +char ** tplDup(char ** tup) { + int i = 0; + for(; tup[i]; i++) { } // i = num non-null entries + char ** ret = malloc(sizeof(char**) * (i+1)); + ret[i] = 0; + while(i) { + i--; + ret[i] = strdup(tup[i]); + } + return ret; +} +void tplFree(char ** tup) { + for(int i = 0; tup[i]; i++) { + free(tup[i]); + } + free(tup); +} + +lladdIterator_t* ReferentialAlgebra_OpenTableScanner(int xid, recordid catalog, + char * tablename) { + char * table; + size_t sz = ThashLookup(xid, catalog, (byte*)tablename, strlen(tablename)+1, (byte**)&table); + if(sz == -1) { + printf("Unknown table %s\n", tablename); + return 0; + } + assert(sz == strlen(table)+1); + recordid tableRid = ridTpl(table); + lladdIterator_t * it = ThashGenericIterator(xid, tableRid); + free(table); + return it; +} +typedef struct select_predicate { + char ** vals; + int count; + void * freeme; +} select_predicate; + +typedef struct select_impl { + lladdIterator_t * it; + char ** p; +} select_impl; + +////////////////////////////////////////////////////////////////////////////////// +/// /// +/// KEY VALUE TABLE FORMAT /// +/// /// +////////////////////////////////////////////////////////////////////////////////// + +typedef struct kv_impl { + char * catted; + lladdIterator_t * it; +} kv_impl; + +lladdIterator_t* ReferentialAlgebra_KeyValCat(int xid, lladdIterator_t * it) { + kv_impl * kv = malloc(sizeof(kv_impl)); + kv->catted = 0; + kv->it = it; + lladdIterator_t * new_it = malloc(sizeof(lladdIterator_t)); + new_it->type = KEYVAL_ITERATOR; + new_it->impl = kv; + return new_it; +} + +static void kv_close(int xid, void * it) { + kv_impl * kv = it; + Titerator_close(xid, kv->it); + if(kv->catted) { free(kv->catted); kv->catted = 0; } + free(kv); +} +static int kv_next(int xid, void * it) { + kv_impl * kv = it; + if(kv->catted) { free(kv->catted); kv->catted = 0; } + return Titerator_next(xid, kv->it); +} +static int kv_tryNext(int xid, void * it) { + kv_impl * kv = it; + if(kv->catted) { free(kv->catted); kv->catted = 0; } + return Titerator_tryNext(xid, kv->it); +} +static int mkCatted(int xid, kv_impl * kv) { + char * key = 0; + char * val = 0; + Titerator_key(xid, kv->it, (byte**)&key); + Titerator_value(xid, kv->it, (byte**)&val); + + if(!strlen(val)) { + kv->catted = strdup(key); + } else { + kv->catted = malloc(strlen(key) + 1 + strlen(val) + 1); + kv->catted[0]=0; + strcat(strcat(strcat(kv->catted,key),","),val); + } + return(strlen(kv->catted)); +} +static int kv_key(int xid, void * it, byte ** key) { + kv_impl * kv = it; + if(!kv->catted) { + int ret = mkCatted(xid, kv); + *key = (byte*)kv->catted; + return ret; + } else { + *key = (byte*)kv->catted; + return(strlen(kv->catted)+1); + } +} +static int kv_value(int xid, void * it, byte ** val) { + kv_impl * kv = it; + if(!kv->catted) { + int ret = mkCatted(xid, kv); + *val = (byte*)kv->catted; + return ret; + } else { + *val = (byte*)kv->catted; + return strlen(kv->catted)+1; + } +} +static void kv_tupleDone(int xid, void * it) { + kv_impl * kv = it; + Titerator_tupleDone(xid, kv->it); +} +static void kv_releaseLock(int xid, void *it) { + kv_impl * kv = it; + Titerator_releaseLock(xid, kv->it); +} + +////////////////////////////////////////////////////////////////////////////////// +/// /// +/// SELECT /// +/// /// +////////////////////////////////////////////////////////////////////////////////// +lladdIterator_t* ReferentialAlgebra_Select(int xid, lladdIterator_t * it, char ** pred) { + if(!it) { + tplFree(pred); + return 0; + } + select_impl * s = malloc(sizeof(select_impl)); + s->p = pred; + s->it = it; + lladdIterator_t * new_it = malloc(sizeof(lladdIterator_t)); + new_it->type = SELECT_ITERATOR; + new_it->impl = s; + return new_it; +} + +static int matchPredicate(const char const * tup, char ** pred) { + char * tupcpy = strdup(tup); + int colcount = 0; + int predcount = 0; + while(pred[predcount]) {predcount++;} + char ** tok = malloc((predcount+1) * sizeof(char**)); + + char * ths; + const char const * DELIM = ", "; + if((ths = strtok(tupcpy, DELIM))) { + colcount++; + if(colcount > predcount) { + free(tupcpy); + free(tok); + return 0; + } else { + tok[colcount-1] = ths; + } + } + while((ths = strtok(NULL, DELIM))) { + colcount++; + if(colcount > predcount) { + free(tupcpy); + free(tok); + return 0; + } else { + tok[colcount-1] = ths; + } + } + int match = 0; + if(colcount == predcount) { + match=1; + for(int i = 0; i < predcount; i++) { + if(strcmp(pred[i],"*") && strcmp(pred[i], tok[i])) { + match = 0; + break; + } + } + } + free(tupcpy); + free(tok); + return match; +} +static void s_close(int xid, void * it) { + select_impl * impl = it; + Titerator_close(xid, impl->it); + tplFree(impl->p); + free(impl); +} +static int s_next(int xid, void * it) { + select_impl * impl = it; + while(Titerator_next(xid,impl->it)) { + char * val; + Titerator_value(xid, impl->it, (byte**)&val); + // printf("matching w/ %s\n", val); + if(matchPredicate(val, impl->p)) { + return 1; + } else { + Titerator_tupleDone(xid, impl->it); + } + } + return 0; +} +static int s_tryNext(int xid, void * it) { + abort(); +} +static int s_key(int xid, void * it, byte ** key) { + return Titerator_key(xid, ((select_impl*)it)->it, key); +} +static int s_value(int xid, void * it, byte ** val) { + return Titerator_value(xid, ((select_impl*)it)->it, val); +} +static void s_tupleDone(int xid, void * it) { + Titerator_tupleDone(xid, ((select_impl*)it)->it); +} +static void s_releaseLock(int xid, void *it) { + Titerator_releaseLock(xid, ((select_impl*)it)->it); +} + +////////////////////////////////////////////////////////////////////////////////// +/// /// +/// PROJECT /// +/// /// +////////////////////////////////////////////////////////////////////////////////// + +typedef struct project_impl { + int count; + short * cols; + lladdIterator_t * it; + char * val; +} project_impl; + +lladdIterator_t* ReferentialAlgebra_Project(int xid, lladdIterator_t * it, char ** project) { + if(!it) { + if(project) { + tplFree(project); + } + return 0; + } + project_impl * p = malloc(sizeof(project_impl)); + int projectcount = 0; + while(project[projectcount]){projectcount++;} + p->count = projectcount; + p->cols = malloc(sizeof(short) * projectcount); + p->it = it; + p->val = 0; + for(int i = 0; project[i]; i++) { + errno = 0; + char * eos; + long col = strtol(project[i], &eos, 10); + if(*eos!='\0' || ((col == LONG_MIN || col == LONG_MAX) && errno)) { + printf("Couldn't parse column descriptor\n"); + tplFree(project); + free(p->cols); + free(p); + return 0; + } else { + p->cols[i] = col; + } + } + lladdIterator_t * new_it = malloc(sizeof(lladdIterator_t)); + new_it->type = PROJECT_ITERATOR; + new_it->impl = p; + tplFree(project); + return new_it; +} + +static void p_close(int xid, void * it) { + project_impl * impl = it; + Titerator_close(xid, impl->it); + free(impl->cols); + free(impl); +} +static int p_next(int xid, void * it) { + project_impl * impl = it; + return Titerator_next(xid, impl->it); +} +static int p_tryNext(int xid, void * it) { + project_impl * impl = it; + return Titerator_tryNext(xid, impl->it); +} +static int p_key(int xid, void * it, byte ** key) { + project_impl * impl = it; + return Titerator_key(xid,impl->it,key); +} +static int p_value(int xid, void * it, byte ** val) { + project_impl * impl = it; + byte * in_val; + if(impl->val) { + *val = (byte*)impl->val; + return 1; + } + int ret = Titerator_value(xid,impl->it,&in_val); + if(ret) { + char * freeme; + int count; + char ** tok = split((char*)in_val, &freeme, &count, ", "); + *val = malloc(sizeof(char)); + (*val)[0] = '\0'; + + for(int i = 0; i < impl->count; i++) { + if(impl->cols[i] < count) { + if(i) { + (*val) = realloc((*val), strlen((char*)(*val)) + 1 + strlen(tok[impl->cols[i]]) + 1); + (*val) = (byte*)strcat(strcat((char*)(*val), ","), tok[impl->cols[i]]); + } else { + (*val) = realloc((*val), strlen((char*)(*val)) + strlen(tok[impl->cols[i]]) + 1); + (*val) = (byte*)strcat((char*)(*val), tok[impl->cols[i]]); + } + } else { + printf("Tuple is too short for pattern.\n"); + } + } + free(freeme); + free(tok); + } + impl->val = (char*)*val; + return ret; +} +static void p_tupleDone(int xid, void * it) { + project_impl * impl = it; + if(impl->val) { free(impl->val); impl->val = 0; } + Titerator_tupleDone(xid,impl->it); +} +static void p_releaseLock(int xid, void *it) { + project_impl * impl = it; + Titerator_releaseLock(xid,impl->it); +} + +////////////////////////////////////////////////////////////////////////////////// +/// /// +/// JOIN (naive nested loops) /// +/// /// +////////////////////////////////////////////////////////////////////////////////// + +typedef struct join_impl { + char *** inner_tpls; + char ** inner_strs; + char ** freethese; + lladdIterator_t * outer_it; + lladdIterator_t * inner_it; + char ** pred; + int inner_pos; + char ** outer_tpl; + char * outer_str; + char * freeouter; +} join_impl; + +static int matchComparator(char ** tup1, + char ** tup2, + char ** pred) { + int match = 1; + int col = 0; + while(pred[col] && match) { + char * lhs_start = pred[col]; + char * lhs_end = lhs_start; + while(isalnum(*lhs_end)) { lhs_end++; } + int lhs_len = lhs_end - lhs_start; + + char * lhs = calloc(lhs_len+1,sizeof(char)); + memcpy(lhs, lhs_start, lhs_len); + + char * op_start = lhs_end; + while(isblank(*op_start)) { op_start++; } + char * op_end = op_start; + while(!(isblank(*op_end) || isalnum(*op_end))) { op_end++; } + int op_len = op_end - op_start; + + char * op = calloc(op_len+1,sizeof(char)); + memcpy(op, op_start, op_len); + + char * rhs_start = op_end; + while(isblank(*rhs_start)) { rhs_start++; } + char * rhs_end = rhs_start; + while(isalnum(*rhs_end)) { rhs_end++; } + int rhs_len = rhs_end - rhs_start; + + char * rhs = calloc(rhs_len+1,sizeof(char)); + memcpy(rhs, rhs_start, rhs_len); + + long col1 = strtol(lhs, NULL, 10); + long col2 = strtol(rhs, NULL, 10); + + int colcount1 = 0; + int colcount2 = 0; + while(tup1[colcount1]) { colcount1++; } + while(tup2[colcount2]) { colcount2++; } + + if(colcount1 <= col1 || colcount2 <= col2) { + printf("not enough columns for join!\n"); + match = 0; + } else if(!strcmp(op,"=")) { + if(strcmp(tup1[col1], tup2[col2])) { + match = 0; + } + } + col++; + free(rhs); + free(lhs); + free(op); + } + return match; +} + +lladdIterator_t* ReferentialAlgebra_Join(int xid, + char ** pred, + lladdIterator_t * outer_it, + lladdIterator_t * inner_it) { + if(!(outer_it && inner_it)) { + if(pred) { tplFree(pred); } + return 0; + } + + join_impl * j = malloc(sizeof(join_impl)); + + j->inner_it = inner_it; + j->outer_it = outer_it; + j->pred = pred; + + j->inner_tpls = calloc(1, sizeof(char ***)); + j->inner_strs = calloc(1, sizeof(char **)); + j->freethese = malloc(sizeof(char**)); + int i = 0; + while(Titerator_next(xid, inner_it)) { + char * in_val; + Titerator_value(xid, inner_it, (byte**)&in_val); + int count; + char ** tok = split((char*)in_val, (j->freethese)+i, &count, ", "); + j->inner_tpls = realloc(j->inner_tpls, sizeof(char***)*(i+2)); + j->inner_strs = realloc(j->inner_strs, sizeof(char**)*(i+2)); + j->freethese = realloc(j->freethese, sizeof(char**)*(i+2)); + j->inner_tpls[i] = tok; + j->inner_tpls[i+1] = 0; + j->freethese[i+1] = 0; + j->inner_strs[i] = strdup(in_val); + j->inner_strs[i+1] = 0; + Titerator_tupleDone(xid, inner_it); + i++; + } + j->inner_pos = 0; + j->outer_tpl = 0; + lladdIterator_t * new_it = malloc(sizeof(lladdIterator_t)); + new_it->type = JOIN_ITERATOR; + new_it->impl = j; + return ReferentialAlgebra_KeyValCat(xid,new_it); +} + + +static void j_close(int xid, void * it) { + join_impl * j = it; + Titerator_close(xid,j->outer_it); + Titerator_close(xid,j->inner_it); + for(int i = 0; j->inner_tpls[i]; i++) { + // tplFree(j->inner_tpls[i]); + free(j->freethese[i]); + free(j->inner_strs[i]); + free(j->inner_tpls[i]); + } + tplFree(j->pred); + free(j->inner_tpls); + free(j->inner_strs); + free(j->freethese); + if(j->freeouter) { free(j->freeouter); } + // don't free pred; that's the caller's problem. + free(j); +} +static int j_next(int xid, void * it) { + join_impl * j = it; + while(1) { + if((!j->inner_tpls[j->inner_pos]) || (!j->outer_tpl)) { + j->inner_pos = 0; + Titerator_tupleDone(xid, j->outer_it); + if(Titerator_next(xid, j->outer_it)) { + int count; + Titerator_value(xid, j->outer_it, (byte**)&j->outer_str); + j->outer_tpl = split((char*)j->outer_str, &j->freeouter, &count, ", "); + } else { + return 0; + } + } + if(matchComparator(j->outer_tpl, j->inner_tpls[j->inner_pos], j->pred)) { + j->inner_pos++; + return 1; + } else { + j->inner_pos++; + } + } +} +static int j_tryNext(int xid, void * it) { + abort(); +} +static int j_key(int xid, void * it, byte ** key) { + join_impl * j = it; + *key = (byte*)j->outer_str; + return 1; +} +static int j_value(int xid, void * it, byte ** val) { + join_impl * j = it; + *val = (byte*)j->inner_strs[j->inner_pos-1]; + return 1; +} +static void j_tupleDone(int xid, void * it) { + join_impl * j = it; + free(j->outer_tpl); + free(j->freeouter); + j->outer_tpl = 0; + j->freeouter = 0; +} +static void j_releaseLock(int xid, void *it) { + // noop +} +////////////////////////////////////////////////////////////////////////////////// +/// /// +/// ININTIALIZATION /// +/// /// +////////////////////////////////////////////////////////////////////////////////// + +void ReferentialAlgebra_init() { + lladdIterator_def_t select_def = { + s_close, s_next, s_tryNext, s_key, s_value, s_tupleDone, s_releaseLock + }; + lladdIterator_register(SELECT_ITERATOR, select_def); + lladdIterator_def_t project_def = { + p_close, p_next, p_tryNext, p_key, p_value, p_tupleDone, p_releaseLock + }; + lladdIterator_register(PROJECT_ITERATOR, project_def); + lladdIterator_def_t keyval_def = { + kv_close, kv_next, kv_tryNext, kv_key, kv_value, kv_tupleDone, kv_releaseLock + }; + lladdIterator_register(KEYVAL_ITERATOR, keyval_def); + lladdIterator_def_t j_def = { + j_close, j_next, j_tryNext, j_key, j_value, j_tupleDone, j_releaseLock + }; + lladdIterator_register(JOIN_ITERATOR, j_def); +} + + +////////////////////////////////////////////////////////////////////////////////// +/// /// +/// PARSER /// +/// /// +////////////////////////////////////////////////////////////////////////////////// + +// Reserved characters: (, ), [, ], ",", " ". +// Grammar: +// E = (s [tuple] E) | (p [tuple] E) | (j [tuple] E E) | TABLENAME +// tuple = V | V,tuple +// V = string of non-reserved characters +// TABLENAME = string of non-reserved characters + +#define LPAREN '{' +#define RPAREN '}' +#define LBRACKET '(' +#define RBRACKET ')' +#define COMMA ',' +#define SPACE ' ' +#define STRING 's' +#define EOS '\0' +/* + @return one of the above. If returns STRING, set *tok to be the new + token. (*tok should be freed by caller in this case) + */ +int nextToken(char ** head, char ** tok); + +char** parseTuple(char ** head) { + char **tok = calloc(1,sizeof(char*));; + char * mytok; + char ret = nextToken(head, &mytok); + assert(ret == LBRACKET); + int count = 0; + while(1) { + ret = nextToken(head, &mytok); + if(ret == RBRACKET) { + break; + } + assert(ret == STRING); + count++; + tok = realloc(tok, sizeof(char*)*(count+1)); + tok[count] = 0; + tok[count-1] = mytok; + + ret = nextToken(head, &mytok); + if(ret == STRING) { free(mytok); } + if(ret == RBRACKET) { + break; + } + if(ret != COMMA) { + tplFree(tok); + return 0; + } + } + return tok; +} + +lladdIterator_t * parseExpression(int xid, recordid catalog, + char **head) { + while(isblank(**head)) { (*head)++; } + if(**head == LPAREN) { + (*head)++; + lladdIterator_t * it; + if(**head == 's') { + (*head)++; + char ** pred = parseTuple(head); + lladdIterator_t * it2 = parseExpression(xid, catalog, head); + it = ReferentialAlgebra_Select(xid, it2, pred); + if(it2 && !it) { + Titerator_close(xid,it2); + } + } else if(**head == 'p') { + (*head)++; + char ** pred = parseTuple(head); + lladdIterator_t * it2 = parseExpression(xid, catalog, head); + it = ReferentialAlgebra_Project(xid, it2, pred); + if(it2 && !it) { + Titerator_close(xid,it2); + } + } else if(**head == 'j') { + (*head)++; + char ** pred = parseTuple(head); + lladdIterator_t * outer = parseExpression(xid, catalog, head); + lladdIterator_t * inner = parseExpression(xid, catalog, head); + it = ReferentialAlgebra_Join(xid, pred, outer, inner); + if(outer && !it) { + Titerator_close(xid,outer); + } + if(inner && !it) { + Titerator_close(xid,inner); + } + } else { + printf("Unknown operator\n"); + it = 0; + } + if(!it) { + printf("parse error\n"); + return 0; + } + char * foo; + char ret = nextToken(head, &foo); + assert(ret == RPAREN); + return it; + } else { + char * tablename; + char ret = nextToken(head, &tablename); + assert(ret == STRING); + lladdIterator_t * it2 = + ReferentialAlgebra_OpenTableScanner(xid, catalog, tablename); + free(tablename); + + if(!it2) { return 0; } + + + lladdIterator_t * it = ReferentialAlgebra_KeyValCat(xid,it2); + + return it; + } + abort(); +} + +int nextToken(char ** head, char ** tok) { + while(isblank(**head) && **head) { (*head)++; } + switch(**head) { + case LPAREN: { + (*head)++; + return LPAREN; + } break; + case RPAREN: { + (*head)++; + return RPAREN; + } break; + case LBRACKET: { + (*head)++; + return LBRACKET; + } break; + case RBRACKET: { + (*head)++; + return RBRACKET; + } break; + case COMMA: { + (*head)++; + return COMMA; + } break; + case SPACE: { + (*head)++; + return SPACE; + } break; + default: { + if(!**head) { return EOS; }; + char * first = *head; + while(isalnum(**head)||**head=='*'||**head=='=') { (*head)++; } + char * last = *head; + *tok = calloc(1 + last - first, sizeof(char)); + strncpy(*tok, first, last - first); // The remaining byte is the null terminator. + return STRING; + } break; + } +} + +char ** executeQuery(int xid, recordid hash, char * line) { + char * lineptr = line; + + lladdIterator_t * it = parseExpression(xid,hash,&lineptr); + if(it) { + char ** tuples = malloc(sizeof(char*)); + int count = 0; + while(Titerator_next(xid, it)) { + count++; + tuples = realloc(tuples, sizeof(char*)*(count+1)); + Titerator_value(xid,it,(byte**)(tuples+count-1)); + tuples[count-1] = strdup(tuples[count-1]); + Titerator_tupleDone(xid,it); + } + Titerator_close(xid,it); + tuples[count] = 0; + return tuples; + } else { + return 0; + } +} diff --git a/src/apps/referential/algebra.h b/src/apps/referential/algebra.h new file mode 100644 index 0000000..e2ff865 --- /dev/null +++ b/src/apps/referential/algebra.h @@ -0,0 +1,19 @@ +#ifndef __ALGEBRA_H +#define __ALGEBRA_H +lladdIterator_t* ReferentialAlgebra_OpenTableScanner(int xid, recordid catalog, + char * tablename); +lladdIterator_t* ReferentialAlgebra_Select(int xid, lladdIterator_t* it, + char ** predicate); +lladdIterator_t* ReferentialAlgebra_Project(int xid, lladdIterator_t * it, + char ** project); +lladdIterator_t* ReferentialAlgebra_KeyValCat(int xid, lladdIterator_t * it); +char * tplRid(recordid rid); +recordid ridTpl(char * tpl); + +void ReferentialAlgebra_init(); + +char ** executeQuery(int xid, recordid hash, char * line); +lladdIterator_t * parseExpression(int xid, recordid catalog, char **head); +char** parseTuple(char ** head); + +#endif diff --git a/src/apps/referential/dml.c b/src/apps/referential/dml.c new file mode 100644 index 0000000..0add162 --- /dev/null +++ b/src/apps/referential/dml.c @@ -0,0 +1,61 @@ +#include +#include +#include + +#include "dml.h" +#include "algebra.h" +int executeInsert(int xid, recordid tables, char * insert) { + char * linecopy = strdup(insert+strlen("insert")); + char * tbl = strtok(linecopy," "); + char * tup = strtok(NULL,"\r\n"); + if((!tbl) || (!tup)) { + printf("parse error\n"); + return 0; + } + char * tupcopy = strdup(tup); + char * key = strtok(tupcopy,","); + char * trunctup = strtok(NULL,"\r\n"); + char * table; + if(!trunctup) { + trunctup = ""; + } + int sz = ThashLookup(xid, tables, (byte*)tbl, strlen(tbl)+1, (byte**)&table); + if(sz == -1) { + printf("Unknown table %s\n", tbl); + return 0; + } else { + assert(sz == strlen((char*)table)+1); + recordid tableRid = ridTpl(table); + //XXX if(debug) { printf("inserted %s=%s into %s\n", key, tup, tbl); } + ThashInsert(xid, tableRid, (byte*)key, strlen(key)+1,(byte*)trunctup,strlen(trunctup)+1); + } + free(tupcopy); + free(linecopy); + return 1; +} +int executeDelete(int xid, recordid tables, char * delete) { + char * linecopy = strdup(delete+strlen("delete")); + char * tbl = strtok(linecopy," "); + char * tup = strtok(NULL,"\r\n"); + if((!tbl) || (!tup)) { + printf("parse error\n"); + return 0; + } + char * table; + char * tupcopy = strdup(tup); + char * key = strtok(tupcopy,","); + int sz = ThashLookup(xid, tables, (byte*)tbl, strlen(tbl)+1, (byte**)&table); + if(sz == -1) { + printf("Unknown table %s\n", tbl); + return 0; + } else { + assert(sz == strlen((char*)table)+1); + recordid tableRid = ridTpl(table); + // if(debug) { + printf("deleted ->%s<- from %s\n", key, tbl); + ThashRemove(xid, tableRid, (byte*)key, strlen(key)+1);//,(byte*)trunctup,strlen(trunctup)+1); + } + free(tupcopy); + free(linecopy); + return 1; +} diff --git a/src/apps/referential/dml.h b/src/apps/referential/dml.h new file mode 100644 index 0000000..447f096 --- /dev/null +++ b/src/apps/referential/dml.h @@ -0,0 +1,4 @@ +#include + +int executeDelete(int xid, recordid tables, char * delete); +int executeInsert(int xid, recordid tables, char *insert); diff --git a/src/apps/referential/test-script.ref b/src/apps/referential/test-script.ref new file mode 100644 index 0000000..f24de27 --- /dev/null +++ b/src/apps/referential/test-script.ref @@ -0,0 +1,26 @@ +query +query TABLES +query {s (1) TABLES} +query {s (TABLES,*,*,*) TABLES} +query {s (TABLES,*,*,*,*) TABLES} +query {p (1,2,3,4) TABLES} +query {p (0,1,2,3) TABLES} +query {p (0,0,0,0) TABLES} +query {p (0,1,2,3,0,1,2,3,0) TABLES} +query {p (0,1,2,3,0,1,2,3,0) xxx} +query {p (0) xxx} +query {s (*) xxx} +query {j (1=1) xxx yyy} +query {p (a,b,c) TABLES} +query {p (0,1,2) TABLES} +query {s () TABLES} +query {p () TABLES} +query {s () {p () TABLES} } +query {j () TABLES TABLES} +query {j (0=0) TABLES TABLES} +query {j (0=0,1=1) TABLES TABLES} +query {j (0=0,1=2) TABLES TABLES} +query {j (0<0) TABLES TABLES} +query {x y z} +create foo +insert foo bar diff --git a/src/apps/referential/toplevel.c b/src/apps/referential/toplevel.c new file mode 100644 index 0000000..554f0de --- /dev/null +++ b/src/apps/referential/toplevel.c @@ -0,0 +1,362 @@ +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "algebra.h" +#include "dml.h" + +#define MAX_CONN_QUEUE 10 +#define THREAD_POOL 20 +#define SHUTDOWN_SERVER 1 + +typedef struct thread_arg { + int id; + recordid hash; +} thread_arg; + +pthread_mutex_t interpreter_mut = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t interpreter_cond = PTHREAD_COND_INITIALIZER; +pthread_cond_t nextSocket_cond = PTHREAD_COND_INITIALIZER; +FILE * nextSocket = 0; + +int shuttingdown = 0; +#define INTERPRETER_BUSY 1 +#define INTERPRETER_IDLE 0 + +char interpreterStates[THREAD_POOL]; +pthread_t interpreters[THREAD_POOL]; +FILE * interpreterConnections[THREAD_POOL]; + +int openInterpreter(FILE * in, FILE * out, recordid hash); + +void * interpreterThread(void * arg) { + thread_arg * a = arg; + int i = a->id; + // printf("I am thread %d\n",i); + pthread_mutex_lock(&interpreter_mut); + while(!shuttingdown) { + while(!nextSocket) { + if(shuttingdown) { break; } + pthread_cond_wait(&interpreter_cond, &interpreter_mut); + } + if(shuttingdown) { break; } + interpreterStates[i] = INTERPRETER_BUSY; + interpreterConnections[i] = nextSocket; + nextSocket = 0; + pthread_cond_signal(&nextSocket_cond); + pthread_mutex_unlock(&interpreter_mut); + // printf("Opening connection for FILE %x\n",interpreterConnections[i]); + int ret = openInterpreter(interpreterConnections[i], + interpreterConnections[i],a->hash); + pthread_mutex_lock(&interpreter_mut); + if(ret == SHUTDOWN_SERVER) { + shuttingdown = 1; + } + interpreterStates[i] = INTERPRETER_IDLE; + fclose(interpreterConnections[i]); + interpreterConnections[i] = 0; + } + pthread_mutex_unlock(&interpreter_mut); + free(a); + return 0; +} + +int openInterpreter(FILE * in, FILE * out, recordid hash) { + char * line = NULL; + size_t len = 0; + int AUTOCOMMIT = 1; + int debug = 0; + int xid = -1; + size_t read; + + fprintf(out, "> "); + int ret = 0; + if(!AUTOCOMMIT) { xid = Tbegin(); } + while((read = getline(&line, &len, in)) != -1) { + if(line[0] == '!') { + if(!strncmp(line+1,"debug",strlen("debug"))) { + debug = !debug; + if(debug) + fprintf(out, "Enabling debugging\n"); + else + fprintf(out, "Disabling debugging\n"); + } else if(!strncmp(line+1,"regions",strlen("regions"))) { + fprintf(out, "Boundary tag pages:\n"); + pageid_t pid = REGION_FIRST_TAG; + boundary_tag tag; + TregionReadBoundaryTag(-1,pid,&tag); + int done = 0; + while(!done) { + fprintf(out, "\tpageid=%lld\ttype=%d\tsize=%d\n", pid, tag.allocation_manager, tag.size); + if(tag.size == UINT32_MAX) { fprintf(out, "\t[EOF]\n"); } + int err = TregionNextBoundaryTag(-1,&pid,&tag,0); + if(!err) { done = 1; } + } + } else if(!strncmp(line+1,"autocommit",strlen("autocommit"))) { + if(AUTOCOMMIT) { + // we're not in a transaction + fprintf(out, "Disabling autocommit\n"); + AUTOCOMMIT = 0; + xid = Tbegin(); + } else { + fprintf(out, "Enabling autocommit\n"); + AUTOCOMMIT = 1; + Tcommit(xid); + } + } else if(!strncmp(line+1,"parseTuple",strlen("parseToken"))) { + char * c = line + 1 + strlen("parseToken"); + char ** toks = parseTuple(&c); + for(int i = 0; toks[i]; i++) { + fprintf(out, "col %d = ->%s<-\n", i, toks[i]); + } + fprintf(out, "trailing stuff: %s", c); + } else if(!strncmp(line+1,"parseExpression", + strlen("parseExpression"))) { + char * c = line + 1 + strlen("parseExpression"); + lladdIterator_t * it = parseExpression(xid, hash, &c); + it = 0; + } else if(!strncmp(line+1,"exit",strlen("exit"))) { + break; + } else if(!strncmp(line+1,"shutdown",strlen("shutdown"))) { + ret = SHUTDOWN_SERVER; + break; + } + } else { + if(!strncmp(line,"create",strlen("create"))) { + + char * linecopy = strdup(line+strlen("create")); + char * tablename = strtok(linecopy, " \r\n"); + size_t sz; + byte* delme; + + if(AUTOCOMMIT) xid = Tbegin(); + + if(-1 == (sz=ThashLookup(xid, hash, (byte*)tablename,strlen(tablename)+1,&delme))) { + fprintf(out, "Creating table %s\n", tablename); + recordid newHash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH); + char * tpl = tplRid(newHash); + char * insert; + asprintf(&insert, "insert TABLES %s,%s\n", tablename, tpl); + + executeInsert(xid, hash, insert); + + free(insert); + free(tpl); + } else { + fprintf(out, "Table already exists: %s\n", tablename); + free(delme); + // assert(sz == sizeof(recordid)); + } + + if(AUTOCOMMIT) Tcommit(xid); + + free(linecopy); + } else if(!strncmp(line,"query",strlen("query"))) { + char * linecopy = strdup(line+strlen("query")); + + char ** result = executeQuery(xid, hash, linecopy); + + int i = 0; + if(result) { + while(result[i]) { + fprintf(out, "%s\n", result[i]); + free(result[i]); + i++; + } + free(result); + } else { + fprintf(out, "query failed\n"); + } + free(linecopy); + } else if(!strncmp(line,"insert",strlen("insert"))) { + if(AUTOCOMMIT) xid = Tbegin(); + executeInsert(xid, hash, line); + if(AUTOCOMMIT) Tcommit(xid); + } else if(!strncmp(line,"delete",strlen("delete"))) { + if(AUTOCOMMIT) xid = Tbegin(); + executeDelete(xid, hash, line); + if(AUTOCOMMIT) Tcommit(xid); + } else { + fprintf(out, "I don't understand...\n"); + } + } + fprintf(out, "> "); + } + if(!AUTOCOMMIT) { Tcommit(xid); } + + free(line); + fprintf(out, "\n"); + return ret; +} + +void startServer(char * addr, recordid hash) { + struct addrinfo hints; + struct addrinfo *result, *rp; + int sfd, s; + struct sockaddr peer_addr; + socklen_t peer_addr_len; + // ssize_t nread; + //char buf[BUF_SIZE]; + + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_INET; //AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_STREAM; /* Datagram socket */ + hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ + hints.ai_protocol = 0; /* Any protocol */ + + printf("Listening on socket\n"); + + s = getaddrinfo("127.0.0.1", addr, &hints, &result); + if (s != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); + exit(EXIT_FAILURE); + } + + /* getaddrinfo() returns a list of address structures. + Try each address until we successfully bind(). + If socket(2) (or bind(2)) fails, we (close the socket + and) try the next address. */ + + for (rp = result; rp != NULL; rp = rp->ai_next) { + sfd = socket(rp->ai_family, rp->ai_socktype, + rp->ai_protocol); + if (sfd == -1) + continue; + + if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) + break; /* Success */ + + close(sfd); + } + + if (rp == NULL) { /* No address succeeded */ + perror("Could not bind\n"); + exit(EXIT_FAILURE); + } + + freeaddrinfo(result); /* No longer needed */ + + int err = listen(sfd, MAX_CONN_QUEUE); + if(err == -1) { + perror("Couldn't listen()"); + return; + } + + printf("Spawning servers.\n"); fflush(NULL); + + for(int i = 0; i < THREAD_POOL; i++) { + thread_arg * arg = malloc(sizeof(thread_arg)); + arg->id = i; + arg->hash = hash; + interpreterStates[i] = INTERPRETER_IDLE; + interpreterConnections[i] = 0; + pthread_create(&interpreters[i], 0, interpreterThread, arg); + } + + printf("Ready for connections.\n"); fflush(NULL); + + /* Read datagrams and echo them back to sender */ + + for (;;) { + int fd = accept(sfd, &peer_addr, &peer_addr_len); + if(fd == -1) { + perror("Error accepting connection"); + } else { + FILE * sock = fdopen(fd, "w+"); + + pthread_mutex_lock(&interpreter_mut); + + // int ret = openInterpreter(sock, sock, hash); + // fclose(sock); + // if(ret) { + while(nextSocket) { + pthread_cond_wait(&nextSocket_cond, &interpreter_mut); + } + + nextSocket = sock; + pthread_cond_signal(&interpreter_cond); + + pthread_mutex_unlock(&interpreter_mut); + if(shuttingdown) { + break; + } + + } + } + + pthread_cond_broadcast(&interpreter_cond); + for(int i = 0; i < THREAD_POOL; i++) { + pthread_join(interpreters[i],0); + } + close(sfd); + +} + + +int main(int argc, char * argv[]) { + + Tinit(); + ReferentialAlgebra_init(); + + recordid rootEntry; + recordid hash; + int xid = Tbegin(); + if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { + printf("Creating new store\n"); + + rootEntry = Talloc(xid, sizeof(recordid)); + assert(rootEntry.page == ROOT_RECORD.page); + assert(rootEntry.slot == ROOT_RECORD.slot); + + hash = ThashCreate(xid, VARIABLE_LENGTH, VARIABLE_LENGTH); + + Tset(xid, rootEntry, &hash); + + char * tpl = tplRid(hash); + + ThashInsert(xid,hash,(byte*)"TABLES", strlen("TABLES")+1, (byte*)tpl, strlen(tpl)+1); + + } else { + printf("Opened existing store\n"); + rootEntry.page = ROOT_RECORD.page; + rootEntry.slot = ROOT_RECORD.slot; + rootEntry.size = sizeof(recordid); + + Tread(xid, rootEntry, &hash); + } + + Tcommit(xid); + + FILE * in; + if(argc == 3) { // listen on socket + if(strcmp("--socket", argv[1])) { + printf("usage:\n\n%s\n%s \n%s --socket addr:port\n", + argv[0],argv[0],argv[0]); + } else { + startServer(argv[2], hash); + } + + printf("Shutting down...\n"); + } else { + if(argc == 2) { + in = fopen(argv[1], "r"); + if(!in) { + printf("Couldn't open input file.\n"); + return 1; + } + } else { + in = stdin; + } + openInterpreter(in, stdout, hash); + } + Tdeinit(); +}