removed dead files, switch to CMake

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@523 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-01-26 00:37:05 +00:00
parent 648a725920
commit 78f9ad12c1
11 changed files with 0 additions and 2083 deletions

View file

@ -1,77 +0,0 @@
/*! \file log4_util.cc
* \brief This file has the helper functions for log4cpp;
*
* Copyright (c) 2008 Yahoo, Inc.
* All rights reserved.
*/
#include <iostream>
#include <log4cpp/PropertyConfigurator.hh>
#include "LogUtils.h"
using namespace log4cpp;
using namespace std;
// hacked link to actioncontext
std::string s_trackPathLog;
LogMethod::
LogMethod(log4cpp::Category& log, log4cpp::Priority::Value priority,
const char *function) :
log_(log), priority_(priority), function_(function)
{
if(log_.isPriorityEnabled(priority_)) {
log_.getStream(priority_) << "Entering: " << function_;
}
}
LogMethod::
~LogMethod()
{
if(log_.isPriorityEnabled(priority_)) {
log_.getStream(priority_) << "Exiting: " << function_;
}
}
// Protects against multiple calls (won't try to re-init) and gives
// back the same answer the original call got.
static int log4cppInitResult = -1;
bool
initLog4cpp(const string &confFile)
{
if (log4cppInitResult != -1) {
return (log4cppInitResult == 0 ? true : false);
}
log4cppInitResult = 0; // Assume success.
try {
PropertyConfigurator::configure(confFile);
} catch (log4cpp::ConfigureFailure &e) {
cerr << "log4cpp configuration failure while loading '" <<
confFile << "' : " << e.what() << endl;
log4cppInitResult = 1;
} catch (std::exception &e) {
cerr << "exception caught while configuring log4cpp via '" <<
confFile << "': " << e.what() << endl;
log4cppInitResult = 1;
} catch (...) {
cerr << "unknown exception while configuring log4cpp via '" <<
confFile << "'." << endl;
log4cppInitResult = 1;
}
return (log4cppInitResult == 0 ? true : false);
}
/*
* For customized vim control
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: sw=4:ts=4:et
* vim<600: sw=4:ts=4:et
*/

View file

@ -1,130 +0,0 @@
/* Copyright (C) 2007 Yahoo! Inc. All Rights Reserved. */
#ifndef LOG_UTIL_H
#define LOG_UTIL_H
#include <log4cpp/Category.hh>
#include "StringUtils.h"
/**
* Quick and dirty link between LogUtils and ActionContext without having to
* resolve cross-inclusion issues, or force all components to start including
* ActionContext if they don't already.
*/
extern std::string s_trackPathLog;
// These macros cannot be protected by braces because of the trailing stream
// arguments that get appended. Care must taken not to use them inside if/else
// blocks that do not use curly braces.
// I.e., the following will give unexpected results:
// if(foo)
// DHT_DEBUG_STREAM() << "heyheyhey";
// else
// blah();
// The 'else' will end up applying to the 'if' within the debug macro.
// Regardless of this, our standards say to always use curly brackets
// on every block anyway, no matter what.
#define DHT_DEBUG_STREAM() if(log.isDebugEnabled()) log.debugStream() << __FUNCTION__ << "():" << __LINE__ << ":"
#define DHT_INFO_STREAM() if(log.isInfoEnabled()) log.infoStream() << __FUNCTION__ << "():" << __LINE__ << ":"
#define DHT_INFO_WITH_STACK_STREAM() if(log.isInfoEnabled()) log.infoStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog
#define DHT_WARN_STREAM() if(log.isWarnEnabled()) log.warnStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog
#define DHT_ERROR_STREAM() if(log.isErrorEnabled()) log.errorStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog
#define DHT_CRIT_STREAM() if(log.isCritEnabled()) log.critStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog
#define DHT_TRACE_PRIORITY log4cpp::Priority::DEBUG + 50
#define DHT_TRACE_STREAM() if (log.isPriorityEnabled(DHT_TRACE_PRIORITY)) log.getStream(DHT_TRACE_PRIORITY) << __FUNCTION__ << "():" << __LINE__ << ":"
// Sadly, sometimes 'log' is reserved by someone else so the code needs to
// use a different name for log. In that case, it can be passed in to these.
#define DHT_DEBUG_STREAML(x_log_hdl_x) if((x_log_hdl_x).isDebugEnabled()) (x_log_hdl_x).debugStream() << __FUNCTION__ << "():" << __LINE__ << ":"
#define DHT_INFO_STREAML(x_log_hdl_x) if((x_log_hdl_x).isInfoEnabled()) (x_log_hdl_x).infoStream() << __FUNCTION__ << "():" << __LINE__ << ":"
#define DHT_INFO_WITH_STACK_STREAML(x_log_hdl_x) if((x_log_hdl_x).isInfoEnabled()) (x_log_hdl_x).infoStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog
#define DHT_WARN_STREAML(x_log_hdl_x) if((x_log_hdl_x).isWarnEnabled()) (x_log_hdl_x).warnStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog
#define DHT_ERROR_STREAML(x_log_hdl_x) if((x_log_hdl_x).isErrorEnabled()) (x_log_hdl_x).errorStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog
#define DHT_CRIT_STREAML(x_log_hdl_x) if((x_log_hdl_x).isCritEnabled()) (x_log_hdl_x).critStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog
#define DHT_TRACE_STREAML(x_log_hdl_x) if ((x_log_hdl_x).isPriorityEnabled(DHT_TRACE_PRIORITY)) (x_log_hdl_x).getStream(DHT_TRACE_PRIORITY) << __FUNCTION__ << "():" << __LINE__ << ":"
//Macros to use when a function returns on error without writing any log message
// or error translation
#define RETURN_IF_NOT_OK(x_call_x) \
{ \
FwCode::ResponseCode rcx___ = (x_call_x); \
if(rcx___ != FwCode::FwOk) { \
return rcx___; \
} \
}
#define RETURN_THIS_IF_NOT_OK(x_othercode_x, x_call_x) \
{ \
FwCode::ResponseCode rcx___ = (x_call_x); \
if(rcx___ != FwCode::FwOk) { \
return (x_othercode_x); \
} \
}
/// Caution! Only use in checks for 'impossible' code conditions. Regular errors
/// should be handled regularly
#define BAD_CODE_ABORT() \
{ \
std::string x_msg_x("Bad code at " __FILE__ ":"); \
x_msg_x.append(StringUtils::toString(__LINE__)); \
throw std::runtime_error(x_msg_x); \
}
#define BAD_CODE_IF_NOT_OK(x_call_x) \
do {\
if((x_call_x) != FwCode::FwOk) { \
BAD_CODE_ABORT(); \
} \
} while(0)
/*
* Above macros are meant to be used by all components.
*/
/**
* Class that allows for method entry/exit logging with a single declaration.
* Always uses debug.
*/
class LogMethod
{
public:
LogMethod(log4cpp::Category& log, log4cpp::Priority::Value priority,
const char *function);
virtual ~LogMethod();
private:
log4cpp::Category& log_;
log4cpp::Priority::Value priority_;
const char *function_;
};
// convenience macros to use the above class
#define LOG_METHOD() LogMethod log_method_entry_exit(log, log4cpp::Priority::DEBUG, __FUNCTION__)
#define TRACE_METHOD() LogMethod log_method_entry_exit(log, DHT_TRACE_PRIORITY, __FUNCTION__)
/** Initialize log4cpp config file.
* This function needs to be called once for each executable. Multiple
* initializations will return the result of the first initialization (IOW,
* an executable can be initialized with exactly one config file). Errors
* encountered by this function are printed onto cerr. See log4cpp
* documentation for what happens when PropertyConfigurator::configure()
* fails.
* \param confFile is the path name of the log4cpp config file.
* Depending on the machine that the executable is running in, the path
* will be different.
* \return true if the initialization succeeds, false if it fails.
*/
bool initLog4cpp(const std::string & confFile);
#endif
/*
* For customized vim control
* Local variables:
* tab-width: 4
* c-basic-offset: 4
* End:
* vim600: sw=4:ts=4:et
* vim<600: sw=4:ts=4:et
*/

View file

@ -1,74 +0,0 @@
STASIS_DIR=../stasis
LIB=$(STASIS_DIR)/build/src/stasis \
-L/home/y/lib
INCLUDE=-I$(STASIS_DIR)/src/ -I$(STASIS_DIR) -I./ \
-I/home/y/include
LIBLIST=-lpthread \
-lstasis \
-lm
# -licui18n \
# -licuuc \
# -licudata \
# -licuio \
# -llog4cpp_y \
# -lthoth
FLAGS=-pg -g -O1
#FLAGS=-O3
HFILES=logserver.h logstore.h logiterators.h datapage.h merger.h tuplemerger.h datatuple.h
CFILES=logserver.cpp logstore.cpp logiterators.cpp datapage.cpp merger.cpp tuplemerger.cpp
# STASIS_DIR=../stasis
# LD_LIBRARY_PATH=$STASIS_DIR/build/src/stasis
# LD_LIBRARY_PATH=$STASIS_DIR/build/src/stasis ./hello
logstore: check_gen.cpp $(HFILES) $(CFILES)
g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS)
test: dp_check lt_check ltable_check merger_check rb_check \
lmerger_check tmerger_check server_check tcpclient_check
lt_check: check_logtree.cpp $(HFILES) $(CFILES)
g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS)
dp_check: check_datapage.cpp $(HFILES) $(CFILES)
g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS)
ltable_check: check_logtable.cpp $(HFILES) $(CFILES)
g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS)
merger_check: check_merge.cpp $(HFILES) $(CFILES)
g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS)
lmerger_check: check_mergelarge.cpp $(HFILES) $(CFILES)
g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS)
tmerger_check: check_mergetuple.cpp $(HFILES) $(CFILES)
g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS)
rb_check: check_rbtree.cpp $(HFILES) $(CFILES)
g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS)
server_check: check_server.cpp $(HFILES) $(CFILES)
g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS)
tcpclient_check: check_tcpclient.cpp $(HFILES) $(CFILES)
g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS)
hello : hello.cpp UCharUtils.cc LogUtils.cc
g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS)
clean:
rm -f logstore server_check hello lt_check merger_check lmerger_check rb_check \
dp_check ltable_check tmerger_check rose tcpclient_check
veryclean: clean
rm -f *~ gmon.out prof.res

View file

@ -1,326 +0,0 @@
/* $Id: UCharUtils.cc,v 1.16 2009/03/03 20:19:18 dlomax Exp $ */
/* Copyright (C) 2008 Yahoo! Inc. All Rights Reserved. */
//#include <dht/UCharUtils.h>
#include "UCharUtils.h"
#include <log4cpp/Category.hh>
#include "LogUtils.h"
//#include "ActionContext.h"
#include <unicode/ucnv.h>
#include <unicode/unorm.h>
#include <thoth/validate.h> // To make sure we have UTF-8
static log4cpp::Category &log =
log4cpp::Category::getInstance("dht.framework." __FILE__);
UCharUtilsImpl *UCharUtils::instance_ = NULL;
UCharUtilsImpl::
UCharUtilsImpl() : uconv_(NULL) {
LOG_METHOD();
ucBuffLen = 0;
ucBuff = NULL;
ucNormBuffLen = 0;
ucNormBuff = NULL;
charBuffLen = 0;
charBuff = NULL;
}
FwCode::ResponseCode UCharUtilsImpl::
init()
{
UErrorCode erc = U_ZERO_ERROR;
uconv_ = ucnv_open("utf-8", &erc);
if (uconv_ == NULL) {
DHT_ERROR_STREAM() << "EC:UNICODE:Problem geting utf-8 converter, erc:" << erc
<< ", " << u_errorName(erc);
return FwCode::UcnvOpenFailed;
}
return FwCode::FwOk;
}
UCharUtilsImpl::
~UCharUtilsImpl() {
reset();
if (uconv_ != NULL) {
ucnv_close(uconv_);
uconv_ = NULL;
}
}
void UCharUtilsImpl::
reset() {
LOG_METHOD();
if (ucBuff != NULL) {
delete[] ucBuff;
ucBuffLen = 0;
ucBuff = NULL;
}
if (ucNormBuff != NULL) {
delete[] ucNormBuff;
ucNormBuffLen = 0;
ucNormBuff = NULL;
}
if (charBuff != NULL) {
delete[] charBuff;
charBuffLen = 0;
charBuff = NULL;
}
}
/**
* Small wrapper to hide multi-line thoth api inside single-line call.
*/
bool UCharUtils::
isUTF8(const std::string& value)
{
size_t pos = 0;
thoth_result result = thoth_validate_utf8(value.c_str(), value.length(),
&pos);
if(result != UTF8_VALID) {
std::cerr
//RESPONSE_DEBUG_STREAM(FwCode::DataNotUtf8)
<< "value (" << value << ") is not UTF-8. thoth_result:" << result
<< ", position=" << pos;
return false;
}
return true;
}
/**
* Small wrapper to hide multi-line thoth api inside single-line call.
*/
bool UCharUtils::
isUTF8(const char * value, size_t value_len)
{
size_t pos = 0;
thoth_result result = thoth_validate_utf8(value, value_len, &pos);
if(result != UTF8_VALID) {
//RESPONSE_DEBUG_STREAM(FwCode::DataNotUtf8)
std::cerr
<< "value (" << std::string(value, value_len)
<< ") is not UTF-8. thoth_result:" << result
<< ", position=" << pos;
return false;
}
return true;
}
// Convert an input string (expected to be UTF-8) into unicode UChars
// The result of the conversion will be sitting in our ucBuff area.
FwCode::ResponseCode UCharUtilsImpl::
convert(const std::string &input, int32_t &len)
{
LOG_METHOD();
//UTF-8 validation
if(!UCharUtils::isUTF8(input)) {
return FwCode::DataNotUtf8;
}
int size = input.length() * 2;
// Check if we already have a big enough buffer
if (ucBuffLen < size) {
// Nope, first check if we need to release what we've been using
if (ucBuff) {
delete[] ucBuff;
}
ucBuffLen = size;
ucBuff = new UChar[ucBuffLen];
}
UErrorCode erc = U_ZERO_ERROR;
len = ucnv_toUChars(uconv_,
ucBuff,
ucBuffLen,
input.data(),
input.length(), &erc);
if (U_FAILURE(erc)) {
//RESPONSE_ERROR_STREAM(FwCode::ConvertToUCharFailed)
std::cerr
<< "EC:UNICODE:error:" << erc
<< ", " << u_errorName(erc)
<< " from converting input:'" << input << "'";
len = 0;
return FwCode::ConvertToUCharFailed;
}
return FwCode::FwOk;
}
// Normalize an input string. Note that all three internal buffers will
// be used by this operation, but by the time we finish, we'll be done
// with them.
FwCode::ResponseCode UCharUtilsImpl::
normalize(const std::string &input, std::string &result /* out */)
{
LOG_METHOD();
// convert our UTF-8 into UChar
int32_t inLen = 0;
FwCode::ResponseCode rc = convert(input, inLen);
if (rc != FwCode::FwOk) {
result.erase();
return rc;
}
// Do a quick check if the input is already normalized so that
// we can duck out early
UErrorCode status = U_ZERO_ERROR;
if (unorm_quickCheck(ucBuff, inLen,
UNORM_NFC, &status) == UNORM_YES) {
DHT_DEBUG_STREAM() << "already normalized input:" << input;
result = input;
return FwCode::FwOk;
}
// Check if we have enough space for the normalized result.
// We'll make the output space twice as big as the input (although
// it's more likely that the normalized result will be shorter
// as it combines characters. E.g. 'A' 'put an accent on the previous'
int32_t newSize = inLen * 2;
if (newSize > ucNormBuffLen) {
DHT_DEBUG_STREAM() << "newSize:" << newSize
<< " ucNormBuffLen:" << ucNormBuffLen;
if (ucNormBuff) {
delete[] ucNormBuff;
}
ucNormBuffLen = newSize;
ucNormBuff = new UChar[ucNormBuffLen];
}
// Do the actual normalization
status = U_ZERO_ERROR;
int32_t normLen = unorm_normalize(ucBuff, inLen,
UNORM_NFC, 0,
ucNormBuff,
ucNormBuffLen,
&status);
if(U_FAILURE(status)) {
//RESPONSE_ERROR_STREAM(FwCode::FwError)
std::cerr
<< "EC:UNICODE:error:" << status << ", " << u_errorName(status)
<<" in unorm_normalize, inLen:" << inLen
<< " ucNormBuffLen:" << ucNormBuffLen;
return FwCode::FwError;
}
// Make sure we have some space to convert back to UTF-8
int32_t resultLen = normLen * 4;
if (resultLen > charBuffLen) {
DHT_DEBUG_STREAM() << "resultLen:" << resultLen
<< " charBuffLen:" << charBuffLen;
if (charBuff) {
delete[] charBuff;
charBuff= NULL;
}
charBuffLen = resultLen;
charBuff = new char[charBuffLen];
}
DHT_DEBUG_STREAM() <<"calling ucnv_fromUChars, normLen:" << normLen;
// Go from UChar array to UTF-8
int32_t actualLen = ucnv_fromUChars(uconv_,
charBuff, charBuffLen,
ucNormBuff, normLen,
&status);
if(U_FAILURE(status)) {
//RESPONSE_ERROR_STREAM(FwCode::FwError)
std::cerr
<< "EC:UNICODE:error:" << status << ", " << u_errorName(status)
<< " in ucnv_fromUChars charBuffLen:" << charBuffLen
<< " normLen:" << normLen;
return FwCode::FwError;
}
// Smack our UTF-8 characters into the result string
result.assign(charBuff, actualLen);
DHT_DEBUG_STREAM() << "leaving actualLen:" << actualLen
<< " result:" << result;
return FwCode::FwOk;
}
FwCode::ResponseCode UCharUtils::
init()
{
if (instance_ == NULL) {
instance_ = new UCharUtilsImpl();
return instance_->init();
}
return FwCode::FwOk; // already initialized
}
void UCharUtils::
close()
{
if(instance_ != NULL) {
delete instance_;
instance_ = NULL;
}
}
// Given an input string, return a unicode UChar array. Note that the
// return value is a pointer to our internal buffer.
UChar * UCharUtils::
getUChar(const std::string &input, int32_t& len) {
LOG_METHOD();
// do the conversion...somehow need 2x input len for utf8 to utf16
if(instance_->convert(input, len) != FwCode::FwOk) {
len = 0;
return NULL;
}
return instance_->ucBuff;
}
FwCode::ResponseCode UCharUtils::
normalize(const std::string &input, std::string &result) {
LOG_METHOD();
return(instance_->normalize(input, result));
}
FwCode::ResponseCode UCharUtils::
parseRegExpPattern(const std::string &pattern,
URegularExpression * & result /* out */)
{
UParseError perr;
UErrorCode erc = U_ZERO_ERROR;
int32_t ureglen = 0;
// Do not delete uregexp, it's a static reusable buffer inside UCharUtils
UChar *uregexp = UCharUtils::getUChar(pattern, ureglen);
if (uregexp == NULL) {
//RESPONSE_ERROR_STREAM(FwCode::ConvertToUCharFailed)
std::cerr
<< "EC:UNICODE|IMPOSSIBLE:Unable to convert pattern to unicode: " << pattern;
return FwCode::ConvertToUCharFailed;
}
URegularExpression *regexp= uregex_open(uregexp, ureglen, 0,
&perr,
&erc);
if(erc != U_ZERO_ERROR) {
//RESPONSE_DEBUG_STREAM(FwCode::CompileRegExFailed)
std::cerr
<< "Compiling regex failed at: " << perr.offset
<< "; re=" << pattern;
return FwCode::CompileRegExFailed;
}
result = regexp;
return FwCode::FwOk;
}

View file

@ -1,139 +0,0 @@
/* Copyright (C) 2008 Yahoo! Inc. All Rights Reserved. */
#ifndef UCHAR_UTILS_H
#define UCHAR_UTILS_H
#include <unicode/ucnv.h>
#include <string>
#include "FwCode.h"
#include <unicode/uregex.h>
// Forward declaration
class UCharUtilsImpl;
/**
* Some handy utilities for working with unicode characters. Yes, these
* could have just been some regular routines instead of static methods
* in a class, but doing it this way gives us some containment of what
* other static tidbits might be necessary (like reusable buffer space).
* which are all hidden within the UCharUtilsImpl class.
*
* This is a singleton - do not use in a threaded program.
*/
class UCharUtils {
private:
/**
* Our pointer to all sorts of goodness.
*/
static UCharUtilsImpl *instance_;
public:
/**
* Initialize the utilities. Primarily opens the utf-8 converter.
* Calling this is required prior to using the converter.
*
* @return FwCode::FwOk on success, FwCode::UcnvOpenFailed on
* failure.
*/
static FwCode::ResponseCode init();
/**
* Release all resources. <code>init()</code> must be called again
* in order to use again.
*/
static void close();
/**
* Small wrapper to hide multi-line thoth api inside single-line call.
*
* @param value string to be tested for utf-8-ness
* @return true if it is utf-8, false if not
*/
static bool isUTF8(const std::string& value);
/**
* Small wrapper to hide multi-line thoth api inside single-line call.
*
* @param value char string to be tested for utf-8-ness
* @param value_len length of <code>value</code>
* @return true if it is utf-8, false if not
*/
static bool isUTF8(const char * value, size_t value_len);
/**
* Convert utf-8 strings into UChar strings. Note that the
* result is an internal reusable buffer so the caller should
* *not* release it.
* @param input utf-8 string to convert
* @param len set to length of output string
* @return NULL if anything bad happens, otherwise an allocated UChar *
* the caller must *NEVER* free this pointer.
*/
static UChar * getUChar(const std::string &input, int32_t& len);
/**
* Do a NFC normalization so that different yet equivalent strings
* will have a single representation. See
* http://www.unicode.org/unicode/reports/tr15/
* for more information.
* @param input A UTF-8 string that we want to normalize
* @param result (output) the normalized UTF-8 string
* @return FwCode::FwOk on success,
* FwCode::FwError on conversion failure,
* FwCode::InvalidData if input was not utf-8
*/
static FwCode::ResponseCode normalize(const std::string &input,
std::string &result);
/**
* Compile a regular expression in a unicode-friendly way.
*
* @param pattern the regexp pattern to compile. Assumed to
* be utf-8.
* @param result (output) Set to point to the compiled regexp.
* Must be released by the caller via uregex_close() when
* finished with it.
* @return FwCode::FwOk if compilation succeeded,
* FwCode::CompileRegExFailed or FwCode::ConvertToUCharFailed
* on failure.
*/
static FwCode::ResponseCode parseRegExpPattern
(const std::string &pattern,
URegularExpression * & result /* out */);
};
/**
* Bug 2574599 - Impl exposed for use by multiple threads; singleton not
* appropriate for multi-threaded program.
*/
class UCharUtilsImpl
{
private:
UConverter *uconv_;
public:
UCharUtilsImpl();
~UCharUtilsImpl();
FwCode::ResponseCode init();
void reset();
FwCode::ResponseCode convert(const std::string &input, int32_t &len);
FwCode::ResponseCode normalize(const std::string &nput, std::string &result);
// Buffer used to convert from UTF-* into UChar
int32_t ucBuffLen;
UChar *ucBuff;
// Buffer used for UChar normalization output
int32_t ucNormBuffLen;
UChar *ucNormBuff;
// Buffer used to convert UChars back to UTF-8
int32_t charBuffLen;
char *charBuff;
};
#endif // _DHT_UCHAR_UTILS_

View file

@ -1,48 +0,0 @@
#include <string>
#include <string.h>
#include <iostream>
#include<stasis/transactional.h>
typedef unsigned char uchar;
typedef struct datatuple
{
typedef byte* key_t;
typedef byte* data_t;
uint32_t keylen;
uint32_t datalen;
key_t key;
data_t data;
};
int main(int argc, char** argv) {
bool * m1 = new bool(false);
std::cout << *m1 << std::endl;
datatuple t;
std::cout << "size of datatuple:\t" << sizeof(datatuple) << std::endl;
t.key = (datatuple::key_t) malloc(10);
const char * str = "12345678";
strcpy((char*)t.key, (str));
t.keylen = strlen((char*)t.key);
t.data = (datatuple::data_t) malloc(10);
const char * str2 = "1234567";
strcpy((char*)t.data, (str2));
t.datalen = strlen((char*)t.data);
std::cout << "size of datatuple:\t" << sizeof(datatuple) << std::endl;
std::cout << "keylen:\t" << t.keylen <<
"\tdatalen:\t" << t.datalen <<
"\t" << t.key <<
"\t" << t.data <<
std::endl;
}

View file

@ -1,519 +0,0 @@
#include "logserver.h"
#include "datatuple.h"
#include "logstore.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <errno.h>
#undef begin
#undef end
#undef try
//server codes
uint8_t logserver::OP_SUCCESS = 1;
uint8_t logserver::OP_FAIL = 2;
uint8_t logserver::OP_SENDING_TUPLE = 3;
//client codes
uint8_t logserver::OP_FIND = 4;
uint8_t logserver::OP_INSERT = 5;
uint8_t logserver::OP_DONE = 6;
uint8_t logserver::OP_INVALID = 32;
void *serverLoop(void *args);
void logserver::startserver(logtable *ltable)
{
sys_alive = true;
this->ltable = ltable;
selcond = new pthread_cond_t;
pthread_cond_init(selcond, 0);
//initialize threads
for(int i=0; i<nthreads; i++)
{
struct pthread_item *worker_th = new pthread_item;
th_list.push_back(worker_th);
worker_th->th_handle = new pthread_t;
struct pthread_data *worker_data = new pthread_data;
worker_th->data = worker_data;
worker_data->idleth_queue = &idleth_queue;
worker_data->ready_queue = &ready_queue;
worker_data->work_queue = &work_queue;
worker_data->qlock = qlock;
worker_data->selcond = selcond;
worker_data->th_cond = new pthread_cond_t;
pthread_cond_init(worker_data->th_cond,0);
worker_data->th_mut = new pthread_mutex_t;
pthread_mutex_init(worker_data->th_mut,0);
worker_data->workitem = new int;
*(worker_data->workitem) = -1;
worker_data->table_lock = lsmlock;
worker_data->ltable = ltable;
worker_data->sys_alive = &sys_alive;
pthread_create(worker_th->th_handle, 0, thread_work_fn, worker_th);
idleth_queue.push(*worker_th);
}
//start server socket
sdata = new serverth_data;
sdata->server_socket = &serversocket;
sdata->server_port = server_port;
sdata->idleth_queue = &idleth_queue;
sdata->ready_queue = &ready_queue;
sdata->selcond = selcond;
sdata->qlock = qlock;
pthread_create(&server_thread, 0, serverLoop, sdata);
//start monitoring loop
eventLoop();
}
void logserver::stopserver()
{
//close the server socket
//stops receiving data on the server socket
shutdown(serversocket, 0);
//wait for all threads to be idle
while(idleth_queue.size() != nthreads)
sleep(1);
//set the system running flag to false
sys_alive = false;
for(int i=0; i<nthreads; i++)
{
pthread_item *idle_th = th_list[i];
//wake up the thread
pthread_mutex_lock(idle_th->data->th_mut);
pthread_cond_signal(idle_th->data->th_cond);
pthread_mutex_unlock(idle_th->data->th_mut);
//wait for it to join
pthread_join(*(idle_th->th_handle), 0);
//free the thread variables
pthread_cond_destroy(idle_th->data->th_cond);
delete idle_th->data->th_cond;
delete idle_th->data->th_mut;
delete idle_th->data->workitem;
delete idle_th->data;
delete idle_th->th_handle;
}
th_list.clear();
//close(serversocket);
return;
}
void logserver::eventLoop()
{
fd_set readfs;
std::vector<int> sel_list;
int maxfd;
struct timeval Timeout;
struct timespec ts;
while(true)
{
//clear readset
FD_ZERO(&readfs);
maxfd = -1;
ts.tv_nsec = 250000; //nanosec
ts.tv_sec = 0;
//Timeout.tv_usec = 250; /* microseconds */
//Timeout.tv_sec = 0; /* seconds */
//update select set
pthread_mutex_lock(qlock);
while(ready_queue.size() == 0)
{
pthread_cond_wait(selcond, qlock);
//pthread_cond_timedwait(selcond, qlock, &ts);
//printf("awoke\n");
}
//new connections + processed conns are in ready_queue
//add them to select list
while(ready_queue.size() > 0)
{
sel_list.push_back(ready_queue.front());
ready_queue.pop();
}
pthread_mutex_unlock(qlock);
//ready select set
for(std::vector<int>::const_iterator itr=sel_list.begin();
itr != sel_list.end(); itr++)
{
if(maxfd < *itr)
maxfd = *itr;
FD_SET(*itr, &readfs);
}
//select events
int sel_res = select(maxfd+1, &readfs, NULL, NULL, NULL);// &Timeout);
//printf("sel_res %d %d\n", sel_res, errno);
//fflush(stdout);
//job assignment to threads
for(int i=0; i<sel_list.size(); i++ )
{
int currsock = sel_list[i];
if (FD_ISSET(currsock, &readfs))
{
//printf("sock %d ready\n", currsock);
pthread_mutex_lock(qlock);
if(idleth_queue.size() > 0) //assign the job to an indle thread
{
pthread_item idle_th = idleth_queue.front();
idleth_queue.pop();
//wake up the thread to do work
pthread_mutex_lock(idle_th.data->th_mut);
//set the job of the idle thread
*(idle_th.data->workitem) = currsock;
pthread_cond_signal(idle_th.data->th_cond);
pthread_mutex_unlock(idle_th.data->th_mut);
}
else
{
//insert the given element to the work queue
work_queue.push(currsock);
printf("work queue size:\t%d\n", work_queue.size());
}
//remove from the sel_list
sel_list.erase(sel_list.begin()+i);
i--;
pthread_mutex_unlock(qlock);
}
}
}
}
void *serverLoop(void *args)
{
serverth_data *sdata = (serverth_data*)args;
int sockfd; //socket descriptor
struct sockaddr_in serv_addr;
struct sockaddr_in cli_addr;
int newsockfd; //newly created
socklen_t clilen = sizeof(cli_addr);
//open a socket
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
printf("ERROR opening socket\n");
return 0;
}
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(sdata->server_port);
if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
{
printf("ERROR on binding.\n");
return 0;
}
//start listening on the server socket
//second arg is the max number of coonections waiting in queue
if(listen(sockfd,SOMAXCONN)==-1)
{
printf("ERROR on listen.\n");
return 0;
}
printf("LSM Server listenning...\n");
*(sdata->server_socket) = sockfd;
int flag, result;
while(true)
{
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
if (newsockfd < 0)
{
printf("ERROR on accept.\n");
return 0; // we probably want to continue instead of return here (when not debugging)
}
flag = 1;
result = setsockopt(newsockfd, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */
TCP_NODELAY, /* name of option */
(char *) &flag, /* the cast is historical
cruft */
sizeof(int)); /* length of option value */
if (result < 0)
{
printf("ERROR on setting socket option TCP_NODELAY.\n");
return 0;
}
char clientip[20];
inet_ntop(AF_INET, (void*) &(cli_addr.sin_addr), clientip, 20);
printf("Connection from:\t%s\n", clientip);
//printf("Number of idle threads %d\n", idleth_queue.size());
pthread_mutex_lock(sdata->qlock);
//insert the given element to the ready queue
sdata->ready_queue->push(newsockfd);
if(sdata->ready_queue->size() == 1) //signal the event loop
pthread_cond_signal(sdata->selcond);
pthread_mutex_unlock(sdata->qlock);
}
}
inline void readfromsocket(int sockd, byte *buf, int count)
{
int n = 0;
while( n < count )
{
n += read( sockd, buf + n, count - n);
}
}
inline void writetosocket(int sockd, byte *buf, int count)
{
int n = 0;
while( n < count )
{
n += write( sockd, buf + n, count - n);
}
}
void * thread_work_fn( void * args)
{
pthread_item * item = (pthread_item *) args;
pthread_mutex_lock(item->data->th_mut);
while(true)
{
while(*(item->data->workitem) == -1)
{
if(!*(item->data->sys_alive))
break;
pthread_cond_wait(item->data->th_cond, item->data->th_mut); //wait for job
}
if(!*(item->data->sys_alive))
{
//printf("thread quitted.\n");
break;
}
//step 1: read the opcode
uint8_t opcode;
ssize_t n = read(*(item->data->workitem), &opcode, sizeof(uint8_t));
assert( n == sizeof(uint8_t));
assert( opcode < logserver::OP_INVALID );
if( opcode == logserver::OP_DONE ) //close the conn on failure
{
pthread_mutex_lock(item->data->qlock);
printf("client done. conn closed. (%d, %d, %d, %d)\n",
n, errno, *(item->data->workitem), item->data->work_queue->size());
close(*(item->data->workitem));
if(item->data->work_queue->size() > 0)
{
int new_work = item->data->work_queue->front();
item->data->work_queue->pop();
printf("work queue size:\t%d\n", item->data->work_queue->size());
*(item->data->workitem) = new_work;
}
else
{
//set work to -1
*(item->data->workitem) = -1;
//add self to idle queue
item->data->idleth_queue->push(*item);
}
pthread_cond_signal(item->data->selcond);
pthread_mutex_unlock(item->data->qlock);
continue;
}
//step 2: read the tuple from client
datatuple tuple;
tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t));
tuple.datalen = (uint32_t*)malloc(sizeof(uint32_t));
//read the key length
n = read(*(item->data->workitem), tuple.keylen, sizeof(uint32_t));
assert( n == sizeof(uint32_t));
//read the data length
n = read(*(item->data->workitem), tuple.datalen, sizeof(uint32_t));
assert( n == sizeof(uint32_t));
//read the key
tuple.key = (byte*) malloc(*tuple.keylen);
readfromsocket(*(item->data->workitem), (byte*) tuple.key, *tuple.keylen);
//read the data
if(!tuple.isDelete() && opcode != logserver::OP_FIND)
{
tuple.data = (byte*) malloc(*tuple.datalen);
readfromsocket(*(item->data->workitem), (byte*) tuple.data, *tuple.datalen);
}
else
tuple.data = 0;
//step 3: process the tuple
//pthread_mutex_lock(item->data->table_lock);
//readlock(item->data->table_lock,0);
if(opcode == logserver::OP_INSERT)
{
//insert/update/delete
item->data->ltable->insertTuple(tuple);
//unlock the lsmlock
//pthread_mutex_unlock(item->data->table_lock);
//unlock(item->data->table_lock);
//step 4: send response
uint8_t rcode = logserver::OP_SUCCESS;
n = write(*(item->data->workitem), &rcode, sizeof(uint8_t));
assert(n == sizeof(uint8_t));
}
else if(opcode == logserver::OP_FIND)
{
//find the tuple
datatuple *dt = item->data->ltable->findTuple(-1, tuple.key, *tuple.keylen);
//unlock the lsmlock
//pthread_mutex_unlock(item->data->table_lock);
//unlock(item->data->table_lock);
if(dt == 0) //tuple deleted
{
dt = (datatuple*) malloc(sizeof(datatuple));
dt->keylen = (uint32_t*) malloc(2*sizeof(uint32_t) + *tuple.keylen);
*dt->keylen = *tuple.keylen;
dt->datalen = dt->keylen + 1;
dt->key = (datatuple::key_t) (dt->datalen+1);
memcpy((byte*) dt->key, (byte*) tuple.key, *tuple.keylen);
dt->setDelete();
}
//send the reply code
uint8_t rcode = logserver::OP_SENDING_TUPLE;
n = write(*(item->data->workitem), &rcode, sizeof(uint8_t));
assert(n == sizeof(uint8_t));
//send the tuple
writetosocket(*(item->data->workitem), (byte*) dt->keylen, dt->byte_length());
//free datatuple
free(dt->keylen);
free(dt);
}
//close the socket
//close(*(item->data->workitem));
//free the tuple
free(tuple.keylen);
free(tuple.datalen);
free(tuple.key);
free(tuple.data);
//printf("socket %d: work completed.\n", *(item->data->workitem));
pthread_mutex_lock(item->data->qlock);
//add conn desc to ready queue
item->data->ready_queue->push(*(item->data->workitem));
//printf("ready queue size: %d sock(%d)\n", item->data->ready_queue->size(), *(item->data->workitem));
if(item->data->ready_queue->size() == 1) //signal the event loop
pthread_cond_signal(item->data->selcond);
if(item->data->work_queue->size() > 0)
{
int new_work = item->data->work_queue->front();
item->data->work_queue->pop();
printf("work queue size:\t%d\n", item->data->work_queue->size());
*(item->data->workitem) = new_work;
}
else
{
//set work to -1
*(item->data->workitem) = -1;
//add self to idle queue
item->data->idleth_queue->push(*item);
}
pthread_mutex_unlock(item->data->qlock);
}
pthread_mutex_unlock(item->data->th_mut);
}

View file

@ -1,163 +0,0 @@
#ifndef _LOGSERVER_H_
#define _LOGSERVER_H_
#include <queue>
#include <vector>
//#include "logstore.h"
#include "datatuple.h"
#include <stasis/transactional.h>
#include <pthread.h>
#undef begin
#undef try
#undef end
class logtable;
struct pthread_item;
struct pthread_data {
std::queue<pthread_item> *idleth_queue;
std::queue<int> *ready_queue;
std::queue<int> *work_queue;
pthread_mutex_t * qlock;
pthread_cond_t *selcond;
pthread_cond_t * th_cond;
pthread_mutex_t * th_mut;
int *workitem; //id of the socket to work
//pthread_mutex_t * table_lock;
rwl *table_lock;
logtable *ltable;
bool *sys_alive;
};
struct pthread_item{
pthread_t * th_handle;
pthread_data *data;
};
//struct work_item
//{
// int sockd; //socket id
// datatuple in_tuple; //request
// datatuple out_tuple; //response
//};
struct serverth_data
{
int *server_socket;
int server_port;
std::queue<pthread_item> *idleth_queue;
std::queue<int> *ready_queue;
pthread_cond_t *selcond;
pthread_mutex_t *qlock;
};
void * thread_work_fn( void *);
class logserver
{
public:
//server codes
static uint8_t OP_SUCCESS;
static uint8_t OP_FAIL;
static uint8_t OP_SENDING_TUPLE;
//client codes
static uint8_t OP_FIND;
static uint8_t OP_INSERT;
static uint8_t OP_DONE;
static uint8_t OP_INVALID;
public:
logserver(int nthreads, int server_port){
this->nthreads = nthreads;
this->server_port = server_port;
//lsmlock = new pthread_mutex_t;
//pthread_mutex_init(lsmlock,0);
lsmlock = initlock();
qlock = new pthread_mutex_t;
pthread_mutex_init(qlock,0);
ltable = 0;
}
~logserver()
{
//delete lsmlock;
deletelock(lsmlock);
delete qlock;
}
void startserver(logtable *ltable);
void stopserver();
public:
private:
//main loop of server
//accept connections, assign jobs to threads
//void dispatchLoop();
void eventLoop();
private:
int server_port;
int nthreads;
bool sys_alive;
int serversocket; //server socket file descriptor
//ccqueue<int> conn_queue; //list of active connections (socket list)
//ccqueue<pthread_item> idleth_queue; //list of idle threads
std::queue<int> ready_queue; //connections to go inside select
std::queue<int> work_queue; //connections to be processed by worker threads
std::queue<pthread_item> idleth_queue;
pthread_mutex_t *qlock;
pthread_t server_thread;
serverth_data *sdata;
pthread_cond_t *selcond; //server loop cond
std::vector<pthread_item *> th_list; // list of threads
rwl *lsmlock; //lock for using lsm table
logtable *ltable;
};
#endif

View file

@ -1,409 +0,0 @@
#include "logserver.h"
#include "datatuple.h"
#include "logstore.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#undef begin
#undef end
#undef try
//server codes
uint8_t logserver::OP_SUCCESS = 1;
uint8_t logserver::OP_FAIL = 2;
uint8_t logserver::OP_SENDING_TUPLE = 3;
//client codes
uint8_t logserver::OP_FIND = 4;
uint8_t logserver::OP_INSERT = 5;
uint8_t logserver::OP_INVALID = 32;
void logserver::startserver(logtable *ltable)
{
sys_alive = true;
this->ltable = ltable;
//initialize threads
for(int i=0; i<nthreads; i++)
{
struct pthread_item *worker_th = new pthread_item;
th_list.push_back(worker_th);
worker_th->th_handle = new pthread_t;
struct pthread_data *worker_data = new pthread_data;
worker_th->data = worker_data;
worker_data->idleth_queue = &idleth_queue;
worker_data->conn_queue = &conn_queue;
worker_data->qlock = qlock;
worker_data->th_cond = new pthread_cond_t;
pthread_cond_init(worker_data->th_cond,0);
worker_data->th_mut = new pthread_mutex_t;
pthread_mutex_init(worker_data->th_mut,0);
worker_data->workitem = new int;
*(worker_data->workitem) = -1;
worker_data->table_lock = lsmlock;
worker_data->ltable = ltable;
worker_data->sys_alive = &sys_alive;
pthread_create(worker_th->th_handle, 0, thread_work_fn, worker_th);
idleth_queue.push(*worker_th);
}
dispatchLoop();
}
void logserver::stopserver()
{
//close the server socket
//stops receiving data on the server socket
shutdown(serversocket, 0);
//wait for all threads to be idle
while(idleth_queue.size() != nthreads)
sleep(1);
//set the system running flag to false
sys_alive = false;
for(int i=0; i<nthreads; i++)
{
pthread_item *idle_th = th_list[i];
//wake up the thread
pthread_mutex_lock(idle_th->data->th_mut);
pthread_cond_signal(idle_th->data->th_cond);
pthread_mutex_unlock(idle_th->data->th_mut);
//wait for it to join
pthread_join(*(idle_th->th_handle), 0);
//free the thread variables
pthread_cond_destroy(idle_th->data->th_cond);
delete idle_th->data->th_cond;
delete idle_th->data->th_mut;
delete idle_th->data->workitem;
delete idle_th->data;
delete idle_th->th_handle;
}
th_list.clear();
return;
}
void logserver::dispatchLoop()
{
int sockfd; //socket descriptor
struct sockaddr_in serv_addr;
struct sockaddr_in cli_addr;
int newsockfd; //newly created
socklen_t clilen = sizeof(cli_addr);
//open a socket
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
printf("ERROR opening socket\n");
return;
}
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
serv_addr.sin_port = htons(server_port);
if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0)
{
printf("ERROR on binding.\n");
return;
}
//start listening on the server socket
//second arg is the max number of coonections waiting in queue
if(listen(sockfd,SOMAXCONN)==-1)
{
printf("ERROR on listen.\n");
return;
}
printf("LSM Server listenning...\n");
serversocket = sockfd;
int flag, result;
while(true)
{
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);
if (newsockfd < 0)
{
printf("ERROR on accept.\n");
return; // we probably want to continue instead of return here (when not debugging)
}
flag = 1;
result = setsockopt(newsockfd, /* socket affected */
IPPROTO_TCP, /* set option at TCP level */
TCP_NODELAY, /* name of option */
(char *) &flag, /* the cast is historical
cruft */
sizeof(int)); /* length of option value */
if (result < 0)
{
printf("ERROR on setting socket option TCP_NODELAY.\n");
return;
}
char clientip[20];
inet_ntop(AF_INET, (void*) &(cli_addr.sin_addr), clientip, 20);
//printf("Connection from:\t%s\n", clientip);
//printf("Number of idle threads %d\n", idleth_queue.size());
pthread_mutex_lock(qlock);
if(idleth_queue.size() > 0)
{
pthread_item idle_th = idleth_queue.front();
idleth_queue.pop();
//wake up the thread to do work
pthread_mutex_lock(idle_th.data->th_mut);
//set the job of the idle thread
*(idle_th.data->workitem) = newsockfd;
pthread_cond_signal(idle_th.data->th_cond);
pthread_mutex_unlock(idle_th.data->th_mut);
}
else
{
//insert the given element to the queue
conn_queue.push(newsockfd);
//printf("Number of queued connections:\t%d\n", conn_queue.size());
}
pthread_mutex_unlock(qlock);
/*
try
{
pthread_item idle_th = idleth_queue.pop();
//wake up the thread to do work
pthread_mutex_lock(idle_th.data->th_mut);
//set the job of the idle thread
*(idle_th.data->workitem) = newsockfd;
pthread_cond_signal(idle_th.data->th_cond);
pthread_mutex_unlock(idle_th.data->th_mut);
}
catch(int empty_exception)
{
//insert the given element to the queue
conn_queue.push(newsockfd);
//printf("Number of queued connections:\t%d\n", conn_queue.size());
}
*/
}
}
inline void readfromsocket(int sockd, byte *buf, int count)
{
int n = 0;
while( n < count )
{
n += read( sockd, buf + n, count - n);
}
}
inline void writetosocket(int sockd, byte *buf, int count)
{
int n = 0;
while( n < count )
{
n += write( sockd, buf + n, count - n);
}
}
void * thread_work_fn( void * args)
{
pthread_item * item = (pthread_item *) args;
pthread_mutex_lock(item->data->th_mut);
while(true)
{
while(*(item->data->workitem) == -1)
{
if(!*(item->data->sys_alive))
break;
pthread_cond_wait(item->data->th_cond, item->data->th_mut); //wait for job
}
if(!*(item->data->sys_alive))
{
//printf("thread quitted.\n");
break;
}
//step 1: read the opcode
uint8_t opcode;
ssize_t n = read(*(item->data->workitem), &opcode, sizeof(uint8_t));
assert( n == sizeof(uint8_t));
assert( opcode < logserver::OP_INVALID );
//step 2: read the tuple from client
datatuple tuple;
tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t));
tuple.datalen = (uint32_t*)malloc(sizeof(uint32_t));
//read the key length
n = read(*(item->data->workitem), tuple.keylen, sizeof(uint32_t));
assert( n == sizeof(uint32_t));
//read the data length
n = read(*(item->data->workitem), tuple.datalen, sizeof(uint32_t));
assert( n == sizeof(uint32_t));
//read the key
tuple.key = (byte*) malloc(*tuple.keylen);
readfromsocket(*(item->data->workitem), (byte*) tuple.key, *tuple.keylen);
//read the data
if(!tuple.isDelete() && opcode != logserver::OP_FIND)
{
tuple.data = (byte*) malloc(*tuple.datalen);
readfromsocket(*(item->data->workitem), (byte*) tuple.data, *tuple.datalen);
}
else
tuple.data = 0;
//step 3: process the tuple
//pthread_mutex_lock(item->data->table_lock);
//readlock(item->data->table_lock,0);
if(opcode == logserver::OP_INSERT)
{
//insert/update/delete
item->data->ltable->insertTuple(tuple);
//unlock the lsmlock
//pthread_mutex_unlock(item->data->table_lock);
//unlock(item->data->table_lock);
//step 4: send response
uint8_t rcode = logserver::OP_SUCCESS;
n = write(*(item->data->workitem), &rcode, sizeof(uint8_t));
assert(n == sizeof(uint8_t));
}
else if(opcode == logserver::OP_FIND)
{
//find the tuple
datatuple *dt = item->data->ltable->findTuple(-1, tuple.key, *tuple.keylen);
//unlock the lsmlock
//pthread_mutex_unlock(item->data->table_lock);
//unlock(item->data->table_lock);
if(dt == 0) //tuple deleted
{
dt = (datatuple*) malloc(sizeof(datatuple));
dt->keylen = (uint32_t*) malloc(2*sizeof(uint32_t) + *tuple.keylen);
*dt->keylen = *tuple.keylen;
dt->datalen = dt->keylen + 1;
dt->key = (datatuple::key_t) (dt->datalen+1);
memcpy((byte*) dt->key, (byte*) tuple.key, *tuple.keylen);
dt->setDelete();
}
//send the reply code
uint8_t rcode = logserver::OP_SENDING_TUPLE;
n = write(*(item->data->workitem), &rcode, sizeof(uint8_t));
assert(n == sizeof(uint8_t));
//send the tuple
writetosocket(*(item->data->workitem), (byte*) dt->keylen, dt->byte_length());
//free datatuple
free(dt->keylen);
free(dt);
}
//close the socket
close(*(item->data->workitem));
//free the tuple
free(tuple.keylen);
free(tuple.datalen);
free(tuple.key);
free(tuple.data);
//printf("socket %d: work completed.\n", *(item->data->workitem));
pthread_mutex_lock(item->data->qlock);
if(item->data->conn_queue->size() > 0)
{
int new_work = item->data->conn_queue->front();
item->data->conn_queue->pop();
*(item->data->workitem) = new_work;
}
else
{
//set work to -1
*(item->data->workitem) = -1;
//add self to idle queue
item->data->idleth_queue->push(*item);
}
pthread_mutex_unlock(item->data->qlock);
/*
//check if there is new work this thread can do
try
{
int new_work = item->data->conn_queue->pop();
*(item->data->workitem) = new_work; //set new work
//printf("socket %d: new work found.\n", *(item->data->workitem));
}
catch(int empty_exception)
{
//printf("socket %d: no new work found.\n", *(item->data->workitem));
//set work to -1
*(item->data->workitem) = -1;
//add self to idle queue
item->data->idleth_queue->push(*item);
}
*/
}
pthread_mutex_unlock(item->data->th_mut);
}

View file

@ -1,198 +0,0 @@
#ifndef _LOGSERVER_H_
#define _LOGSERVER_H_
#include <queue>
#include <vector>
//#include "logstore.h"
#include "datatuple.h"
#include <stasis/transactional.h>
#include <pthread.h>
#undef begin
#undef try
#undef end
class logtable;
template<class T>
class ccqueue
{
public:
ccqueue()
{
qmut = new pthread_mutex_t;
pthread_mutex_init(qmut,0);
}
int size()
{
pthread_mutex_lock(qmut);
int qsize = m_queue.size();
pthread_mutex_unlock(qmut);
return qsize;
}
//inserts a copy of the given element to the queue
void push(const T &item)
{
pthread_mutex_lock(qmut);
m_queue.push(item);
pthread_mutex_unlock(qmut);
return;
}
//returns a copy of the next element
//deletes the copy in the queue
//throws an exception with -1 on empty queue
T pop() throw (int)
{
pthread_mutex_lock(qmut);
if(m_queue.size() > 0)
{
T item = m_queue.front();
m_queue.pop();
pthread_mutex_unlock(qmut);
return item;
}
pthread_mutex_unlock(qmut);
throw(-1);
}
~ccqueue()
{
delete qmut;
}
private:
std::queue<T> m_queue;
pthread_mutex_t *qmut;
};
struct pthread_item;
struct pthread_data {
std::queue<pthread_item> *idleth_queue;
std::queue<int> *conn_queue;
pthread_mutex_t * qlock;
pthread_cond_t * th_cond;
pthread_mutex_t * th_mut;
int *workitem; //id of the socket to work
//pthread_mutex_t * table_lock;
rwl *table_lock;
logtable *ltable;
bool *sys_alive;
};
struct pthread_item{
pthread_t * th_handle;
pthread_data *data;
};
struct work_item
{
int sockd; //socket id
datatuple in_tuple; //request
datatuple out_tuple; //response
};
void * thread_work_fn( void *);
class logserver
{
public:
//server codes
static uint8_t OP_SUCCESS;
static uint8_t OP_FAIL;
static uint8_t OP_SENDING_TUPLE;
//client codes
static uint8_t OP_FIND;
static uint8_t OP_INSERT;
static uint8_t OP_INVALID;
public:
logserver(int nthreads, int server_port){
this->nthreads = nthreads;
this->server_port = server_port;
//lsmlock = new pthread_mutex_t;
//pthread_mutex_init(lsmlock,0);
lsmlock = initlock();
qlock = new pthread_mutex_t;
pthread_mutex_init(qlock,0);
ltable = 0;
}
~logserver()
{
//delete lsmlock;
deletelock(lsmlock);
delete qlock;
}
void startserver(logtable *ltable);
void stopserver();
public:
private:
//main loop of server
//accept connections, assign jobs to threads
void dispatchLoop();
private:
int server_port;
int nthreads;
bool sys_alive;
int serversocket; //server socket file descriptor
//ccqueue<int> conn_queue; //list of active connections (socket list)
//ccqueue<pthread_item> idleth_queue; //list of idle threads
std::queue<int> conn_queue;
std::queue<pthread_item> idleth_queue;
pthread_mutex_t *qlock;
std::vector<pthread_item *> th_list; // list of threads
rwl *lsmlock; //lock for using lsm table
logtable *ltable;
};
#endif