From 78f9ad12c1a076aaada694825717f4a473e0f5df Mon Sep 17 00:00:00 2001 From: sears Date: Tue, 26 Jan 2010 00:37:05 +0000 Subject: [PATCH] removed dead files, switch to CMake git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@523 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- LogUtils.cc | 77 ----- LogUtils.h | 130 --------- Makefile | 74 ----- UCharUtils.cc | 326 --------------------- UCharUtils.h | 139 --------- hello.cpp | 48 --- logserver_pers.cpp | 519 --------------------------------- logserver_pers.h | 163 ----------- logserver_simple.cpp | 409 -------------------------- logserver_simple.h | 198 ------------- check_server.cpp => server.cpp | 0 11 files changed, 2083 deletions(-) delete mode 100644 LogUtils.cc delete mode 100644 LogUtils.h delete mode 100644 Makefile delete mode 100644 UCharUtils.cc delete mode 100644 UCharUtils.h delete mode 100644 hello.cpp delete mode 100644 logserver_pers.cpp delete mode 100644 logserver_pers.h delete mode 100644 logserver_simple.cpp delete mode 100644 logserver_simple.h rename check_server.cpp => server.cpp (100%) diff --git a/LogUtils.cc b/LogUtils.cc deleted file mode 100644 index 3dea981..0000000 --- a/LogUtils.cc +++ /dev/null @@ -1,77 +0,0 @@ -/*! \file log4_util.cc - * \brief This file has the helper functions for log4cpp; - * - * Copyright (c) 2008 Yahoo, Inc. - * All rights reserved. - */ -#include -#include - -#include "LogUtils.h" - -using namespace log4cpp; -using namespace std; - -// hacked link to actioncontext -std::string s_trackPathLog; - -LogMethod:: -LogMethod(log4cpp::Category& log, log4cpp::Priority::Value priority, - const char *function) : - log_(log), priority_(priority), function_(function) -{ - if(log_.isPriorityEnabled(priority_)) { - log_.getStream(priority_) << "Entering: " << function_; - } -} - - -LogMethod:: -~LogMethod() -{ - if(log_.isPriorityEnabled(priority_)) { - log_.getStream(priority_) << "Exiting: " << function_; - } -} - -// Protects against multiple calls (won't try to re-init) and gives -// back the same answer the original call got. -static int log4cppInitResult = -1; - -bool -initLog4cpp(const string &confFile) -{ - - if (log4cppInitResult != -1) { - return (log4cppInitResult == 0 ? true : false); - } - - log4cppInitResult = 0; // Assume success. - try { - PropertyConfigurator::configure(confFile); - } catch (log4cpp::ConfigureFailure &e) { - cerr << "log4cpp configuration failure while loading '" << - confFile << "' : " << e.what() << endl; - log4cppInitResult = 1; - } catch (std::exception &e) { - cerr << "exception caught while configuring log4cpp via '" << - confFile << "': " << e.what() << endl; - log4cppInitResult = 1; - } catch (...) { - cerr << "unknown exception while configuring log4cpp via '" << - confFile << "'." << endl; - log4cppInitResult = 1; - } - - return (log4cppInitResult == 0 ? true : false); -} - -/* - * For customized vim control - * Local variables: - * tab-width: 4 - * c-basic-offset: 4 - * End: - * vim600: sw=4:ts=4:et - * vim<600: sw=4:ts=4:et - */ diff --git a/LogUtils.h b/LogUtils.h deleted file mode 100644 index 73c0af6..0000000 --- a/LogUtils.h +++ /dev/null @@ -1,130 +0,0 @@ -/* Copyright (C) 2007 Yahoo! Inc. All Rights Reserved. */ - -#ifndef LOG_UTIL_H -#define LOG_UTIL_H - -#include -#include "StringUtils.h" - -/** - * Quick and dirty link between LogUtils and ActionContext without having to - * resolve cross-inclusion issues, or force all components to start including - * ActionContext if they don't already. - */ -extern std::string s_trackPathLog; - -// These macros cannot be protected by braces because of the trailing stream -// arguments that get appended. Care must taken not to use them inside if/else -// blocks that do not use curly braces. -// I.e., the following will give unexpected results: -// if(foo) -// DHT_DEBUG_STREAM() << "heyheyhey"; -// else -// blah(); -// The 'else' will end up applying to the 'if' within the debug macro. -// Regardless of this, our standards say to always use curly brackets -// on every block anyway, no matter what. - -#define DHT_DEBUG_STREAM() if(log.isDebugEnabled()) log.debugStream() << __FUNCTION__ << "():" << __LINE__ << ":" -#define DHT_INFO_STREAM() if(log.isInfoEnabled()) log.infoStream() << __FUNCTION__ << "():" << __LINE__ << ":" -#define DHT_INFO_WITH_STACK_STREAM() if(log.isInfoEnabled()) log.infoStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog -#define DHT_WARN_STREAM() if(log.isWarnEnabled()) log.warnStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog -#define DHT_ERROR_STREAM() if(log.isErrorEnabled()) log.errorStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog -#define DHT_CRIT_STREAM() if(log.isCritEnabled()) log.critStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog -#define DHT_TRACE_PRIORITY log4cpp::Priority::DEBUG + 50 -#define DHT_TRACE_STREAM() if (log.isPriorityEnabled(DHT_TRACE_PRIORITY)) log.getStream(DHT_TRACE_PRIORITY) << __FUNCTION__ << "():" << __LINE__ << ":" - -// Sadly, sometimes 'log' is reserved by someone else so the code needs to -// use a different name for log. In that case, it can be passed in to these. -#define DHT_DEBUG_STREAML(x_log_hdl_x) if((x_log_hdl_x).isDebugEnabled()) (x_log_hdl_x).debugStream() << __FUNCTION__ << "():" << __LINE__ << ":" -#define DHT_INFO_STREAML(x_log_hdl_x) if((x_log_hdl_x).isInfoEnabled()) (x_log_hdl_x).infoStream() << __FUNCTION__ << "():" << __LINE__ << ":" -#define DHT_INFO_WITH_STACK_STREAML(x_log_hdl_x) if((x_log_hdl_x).isInfoEnabled()) (x_log_hdl_x).infoStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog -#define DHT_WARN_STREAML(x_log_hdl_x) if((x_log_hdl_x).isWarnEnabled()) (x_log_hdl_x).warnStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog -#define DHT_ERROR_STREAML(x_log_hdl_x) if((x_log_hdl_x).isErrorEnabled()) (x_log_hdl_x).errorStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog -#define DHT_CRIT_STREAML(x_log_hdl_x) if((x_log_hdl_x).isCritEnabled()) (x_log_hdl_x).critStream() << __FUNCTION__ << "():" << __LINE__ << ":" << s_trackPathLog -#define DHT_TRACE_STREAML(x_log_hdl_x) if ((x_log_hdl_x).isPriorityEnabled(DHT_TRACE_PRIORITY)) (x_log_hdl_x).getStream(DHT_TRACE_PRIORITY) << __FUNCTION__ << "():" << __LINE__ << ":" - -//Macros to use when a function returns on error without writing any log message -// or error translation -#define RETURN_IF_NOT_OK(x_call_x) \ -{ \ - FwCode::ResponseCode rcx___ = (x_call_x); \ - if(rcx___ != FwCode::FwOk) { \ - return rcx___; \ - } \ -} - -#define RETURN_THIS_IF_NOT_OK(x_othercode_x, x_call_x) \ -{ \ - FwCode::ResponseCode rcx___ = (x_call_x); \ - if(rcx___ != FwCode::FwOk) { \ - return (x_othercode_x); \ - } \ -} - -/// Caution! Only use in checks for 'impossible' code conditions. Regular errors -/// should be handled regularly -#define BAD_CODE_ABORT() \ - { \ - std::string x_msg_x("Bad code at " __FILE__ ":"); \ - x_msg_x.append(StringUtils::toString(__LINE__)); \ - throw std::runtime_error(x_msg_x); \ - } - -#define BAD_CODE_IF_NOT_OK(x_call_x) \ - do {\ - if((x_call_x) != FwCode::FwOk) { \ - BAD_CODE_ABORT(); \ - } \ - } while(0) - -/* - * Above macros are meant to be used by all components. - */ - -/** - * Class that allows for method entry/exit logging with a single declaration. - * Always uses debug. - */ -class LogMethod -{ - public: - LogMethod(log4cpp::Category& log, log4cpp::Priority::Value priority, - const char *function); - virtual ~LogMethod(); - - private: - log4cpp::Category& log_; - log4cpp::Priority::Value priority_; - const char *function_; -}; - -// convenience macros to use the above class -#define LOG_METHOD() LogMethod log_method_entry_exit(log, log4cpp::Priority::DEBUG, __FUNCTION__) -#define TRACE_METHOD() LogMethod log_method_entry_exit(log, DHT_TRACE_PRIORITY, __FUNCTION__) - -/** Initialize log4cpp config file. - * This function needs to be called once for each executable. Multiple - * initializations will return the result of the first initialization (IOW, - * an executable can be initialized with exactly one config file). Errors - * encountered by this function are printed onto cerr. See log4cpp - * documentation for what happens when PropertyConfigurator::configure() - * fails. - * \param confFile is the path name of the log4cpp config file. - * Depending on the machine that the executable is running in, the path - * will be different. - * \return true if the initialization succeeds, false if it fails. - */ -bool initLog4cpp(const std::string & confFile); - -#endif - -/* - * For customized vim control - * Local variables: - * tab-width: 4 - * c-basic-offset: 4 - * End: - * vim600: sw=4:ts=4:et - * vim<600: sw=4:ts=4:et - */ diff --git a/Makefile b/Makefile deleted file mode 100644 index 7fcb172..0000000 --- a/Makefile +++ /dev/null @@ -1,74 +0,0 @@ -STASIS_DIR=../stasis - -LIB=$(STASIS_DIR)/build/src/stasis \ - -L/home/y/lib -INCLUDE=-I$(STASIS_DIR)/src/ -I$(STASIS_DIR) -I./ \ - -I/home/y/include - -LIBLIST=-lpthread \ - -lstasis \ - -lm -# -licui18n \ -# -licuuc \ -# -licudata \ -# -licuio \ -# -llog4cpp_y \ -# -lthoth - -FLAGS=-pg -g -O1 -#FLAGS=-O3 - -HFILES=logserver.h logstore.h logiterators.h datapage.h merger.h tuplemerger.h datatuple.h -CFILES=logserver.cpp logstore.cpp logiterators.cpp datapage.cpp merger.cpp tuplemerger.cpp - - -# STASIS_DIR=../stasis -# LD_LIBRARY_PATH=$STASIS_DIR/build/src/stasis -# LD_LIBRARY_PATH=$STASIS_DIR/build/src/stasis ./hello - - -logstore: check_gen.cpp $(HFILES) $(CFILES) - g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS) - -test: dp_check lt_check ltable_check merger_check rb_check \ - lmerger_check tmerger_check server_check tcpclient_check - -lt_check: check_logtree.cpp $(HFILES) $(CFILES) - g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS) - -dp_check: check_datapage.cpp $(HFILES) $(CFILES) - g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS) - -ltable_check: check_logtable.cpp $(HFILES) $(CFILES) - g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS) - -merger_check: check_merge.cpp $(HFILES) $(CFILES) - g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS) - -lmerger_check: check_mergelarge.cpp $(HFILES) $(CFILES) - g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS) - -tmerger_check: check_mergetuple.cpp $(HFILES) $(CFILES) - g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS) - -rb_check: check_rbtree.cpp $(HFILES) $(CFILES) - g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS) - -server_check: check_server.cpp $(HFILES) $(CFILES) - g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS) - -tcpclient_check: check_tcpclient.cpp $(HFILES) $(CFILES) - g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS) - - -hello : hello.cpp UCharUtils.cc LogUtils.cc - g++ -o $@ $^ -L$(LIB) $(INCLUDE) $(LIBLIST) $(FLAGS) - -clean: - rm -f logstore server_check hello lt_check merger_check lmerger_check rb_check \ - dp_check ltable_check tmerger_check rose tcpclient_check -veryclean: clean - rm -f *~ gmon.out prof.res - - - diff --git a/UCharUtils.cc b/UCharUtils.cc deleted file mode 100644 index 2133034..0000000 --- a/UCharUtils.cc +++ /dev/null @@ -1,326 +0,0 @@ -/* $Id: UCharUtils.cc,v 1.16 2009/03/03 20:19:18 dlomax Exp $ */ -/* Copyright (C) 2008 Yahoo! Inc. All Rights Reserved. */ - -//#include -#include "UCharUtils.h" -#include -#include "LogUtils.h" -//#include "ActionContext.h" -#include -#include -#include // To make sure we have UTF-8 - -static log4cpp::Category &log = - log4cpp::Category::getInstance("dht.framework." __FILE__); - - -UCharUtilsImpl *UCharUtils::instance_ = NULL; - -UCharUtilsImpl:: -UCharUtilsImpl() : uconv_(NULL) { - LOG_METHOD(); - - ucBuffLen = 0; - ucBuff = NULL; - - ucNormBuffLen = 0; - ucNormBuff = NULL; - - charBuffLen = 0; - charBuff = NULL; -} - -FwCode::ResponseCode UCharUtilsImpl:: -init() -{ - UErrorCode erc = U_ZERO_ERROR; - - uconv_ = ucnv_open("utf-8", &erc); - if (uconv_ == NULL) { - DHT_ERROR_STREAM() << "EC:UNICODE:Problem geting utf-8 converter, erc:" << erc - << ", " << u_errorName(erc); - return FwCode::UcnvOpenFailed; - } - return FwCode::FwOk; -} - -UCharUtilsImpl:: -~UCharUtilsImpl() { - reset(); - if (uconv_ != NULL) { - ucnv_close(uconv_); - uconv_ = NULL; - } -} - -void UCharUtilsImpl:: -reset() { - LOG_METHOD(); - - if (ucBuff != NULL) { - delete[] ucBuff; - ucBuffLen = 0; - ucBuff = NULL; - } - if (ucNormBuff != NULL) { - delete[] ucNormBuff; - ucNormBuffLen = 0; - ucNormBuff = NULL; - } - if (charBuff != NULL) { - delete[] charBuff; - charBuffLen = 0; - charBuff = NULL; - } -} - -/** - * Small wrapper to hide multi-line thoth api inside single-line call. - */ -bool UCharUtils:: -isUTF8(const std::string& value) -{ - size_t pos = 0; - thoth_result result = thoth_validate_utf8(value.c_str(), value.length(), - &pos); - - if(result != UTF8_VALID) { - std::cerr - //RESPONSE_DEBUG_STREAM(FwCode::DataNotUtf8) - << "value (" << value << ") is not UTF-8. thoth_result:" << result - << ", position=" << pos; - return false; - } - return true; -} - -/** - * Small wrapper to hide multi-line thoth api inside single-line call. - */ -bool UCharUtils:: -isUTF8(const char * value, size_t value_len) -{ - size_t pos = 0; - thoth_result result = thoth_validate_utf8(value, value_len, &pos); - - if(result != UTF8_VALID) { - //RESPONSE_DEBUG_STREAM(FwCode::DataNotUtf8) - std::cerr - << "value (" << std::string(value, value_len) - << ") is not UTF-8. thoth_result:" << result - << ", position=" << pos; - return false; - } - return true; -} - -// Convert an input string (expected to be UTF-8) into unicode UChars -// The result of the conversion will be sitting in our ucBuff area. -FwCode::ResponseCode UCharUtilsImpl:: -convert(const std::string &input, int32_t &len) -{ - LOG_METHOD(); - - //UTF-8 validation - if(!UCharUtils::isUTF8(input)) { - return FwCode::DataNotUtf8; - } - - int size = input.length() * 2; - - // Check if we already have a big enough buffer - if (ucBuffLen < size) { - // Nope, first check if we need to release what we've been using - if (ucBuff) { - delete[] ucBuff; - } - ucBuffLen = size; - ucBuff = new UChar[ucBuffLen]; - } - - UErrorCode erc = U_ZERO_ERROR; - len = ucnv_toUChars(uconv_, - ucBuff, - ucBuffLen, - input.data(), - input.length(), &erc); - - if (U_FAILURE(erc)) { - //RESPONSE_ERROR_STREAM(FwCode::ConvertToUCharFailed) - std::cerr - << "EC:UNICODE:error:" << erc - << ", " << u_errorName(erc) - << " from converting input:'" << input << "'"; - len = 0; - return FwCode::ConvertToUCharFailed; - } - return FwCode::FwOk; -} - -// Normalize an input string. Note that all three internal buffers will -// be used by this operation, but by the time we finish, we'll be done -// with them. -FwCode::ResponseCode UCharUtilsImpl:: -normalize(const std::string &input, std::string &result /* out */) -{ - LOG_METHOD(); - - // convert our UTF-8 into UChar - int32_t inLen = 0; - FwCode::ResponseCode rc = convert(input, inLen); - - if (rc != FwCode::FwOk) { - result.erase(); - return rc; - } - - // Do a quick check if the input is already normalized so that - // we can duck out early - UErrorCode status = U_ZERO_ERROR; - if (unorm_quickCheck(ucBuff, inLen, - UNORM_NFC, &status) == UNORM_YES) { - DHT_DEBUG_STREAM() << "already normalized input:" << input; - result = input; - return FwCode::FwOk; - } - - // Check if we have enough space for the normalized result. - // We'll make the output space twice as big as the input (although - // it's more likely that the normalized result will be shorter - // as it combines characters. E.g. 'A' 'put an accent on the previous' - int32_t newSize = inLen * 2; - if (newSize > ucNormBuffLen) { - DHT_DEBUG_STREAM() << "newSize:" << newSize - << " ucNormBuffLen:" << ucNormBuffLen; - if (ucNormBuff) { - delete[] ucNormBuff; - } - ucNormBuffLen = newSize; - ucNormBuff = new UChar[ucNormBuffLen]; - } - - // Do the actual normalization - status = U_ZERO_ERROR; - int32_t normLen = unorm_normalize(ucBuff, inLen, - UNORM_NFC, 0, - ucNormBuff, - ucNormBuffLen, - &status); - if(U_FAILURE(status)) { - //RESPONSE_ERROR_STREAM(FwCode::FwError) - std::cerr - << "EC:UNICODE:error:" << status << ", " << u_errorName(status) - <<" in unorm_normalize, inLen:" << inLen - << " ucNormBuffLen:" << ucNormBuffLen; - return FwCode::FwError; - } - - // Make sure we have some space to convert back to UTF-8 - int32_t resultLen = normLen * 4; - if (resultLen > charBuffLen) { - DHT_DEBUG_STREAM() << "resultLen:" << resultLen - << " charBuffLen:" << charBuffLen; - if (charBuff) { - delete[] charBuff; - charBuff= NULL; - } - charBuffLen = resultLen; - charBuff = new char[charBuffLen]; - } - - DHT_DEBUG_STREAM() <<"calling ucnv_fromUChars, normLen:" << normLen; - - // Go from UChar array to UTF-8 - int32_t actualLen = ucnv_fromUChars(uconv_, - charBuff, charBuffLen, - ucNormBuff, normLen, - &status); - if(U_FAILURE(status)) { - //RESPONSE_ERROR_STREAM(FwCode::FwError) - std::cerr - << "EC:UNICODE:error:" << status << ", " << u_errorName(status) - << " in ucnv_fromUChars charBuffLen:" << charBuffLen - << " normLen:" << normLen; - return FwCode::FwError; - } - - // Smack our UTF-8 characters into the result string - result.assign(charBuff, actualLen); - DHT_DEBUG_STREAM() << "leaving actualLen:" << actualLen - << " result:" << result; - return FwCode::FwOk; -} - - -FwCode::ResponseCode UCharUtils:: -init() -{ - if (instance_ == NULL) { - instance_ = new UCharUtilsImpl(); - return instance_->init(); - } - return FwCode::FwOk; // already initialized -} - -void UCharUtils:: -close() -{ - if(instance_ != NULL) { - delete instance_; - instance_ = NULL; - } -} - -// Given an input string, return a unicode UChar array. Note that the -// return value is a pointer to our internal buffer. -UChar * UCharUtils:: -getUChar(const std::string &input, int32_t& len) { - LOG_METHOD(); - - // do the conversion...somehow need 2x input len for utf8 to utf16 - if(instance_->convert(input, len) != FwCode::FwOk) { - len = 0; - return NULL; - } - - return instance_->ucBuff; -} - -FwCode::ResponseCode UCharUtils:: -normalize(const std::string &input, std::string &result) { - LOG_METHOD(); - return(instance_->normalize(input, result)); -} - - -FwCode::ResponseCode UCharUtils:: -parseRegExpPattern(const std::string &pattern, - URegularExpression * & result /* out */) -{ - UParseError perr; - UErrorCode erc = U_ZERO_ERROR; - int32_t ureglen = 0; - - // Do not delete uregexp, it's a static reusable buffer inside UCharUtils - UChar *uregexp = UCharUtils::getUChar(pattern, ureglen); - if (uregexp == NULL) { - //RESPONSE_ERROR_STREAM(FwCode::ConvertToUCharFailed) - std::cerr - << "EC:UNICODE|IMPOSSIBLE:Unable to convert pattern to unicode: " << pattern; - return FwCode::ConvertToUCharFailed; - } - - URegularExpression *regexp= uregex_open(uregexp, ureglen, 0, - &perr, - &erc); - if(erc != U_ZERO_ERROR) { - //RESPONSE_DEBUG_STREAM(FwCode::CompileRegExFailed) - std::cerr - << "Compiling regex failed at: " << perr.offset - << "; re=" << pattern; - return FwCode::CompileRegExFailed; - } - - result = regexp; - return FwCode::FwOk; -} diff --git a/UCharUtils.h b/UCharUtils.h deleted file mode 100644 index 4f751be..0000000 --- a/UCharUtils.h +++ /dev/null @@ -1,139 +0,0 @@ -/* Copyright (C) 2008 Yahoo! Inc. All Rights Reserved. */ - -#ifndef UCHAR_UTILS_H -#define UCHAR_UTILS_H - -#include -#include -#include "FwCode.h" -#include - -// Forward declaration -class UCharUtilsImpl; - -/** - * Some handy utilities for working with unicode characters. Yes, these - * could have just been some regular routines instead of static methods - * in a class, but doing it this way gives us some containment of what - * other static tidbits might be necessary (like reusable buffer space). - * which are all hidden within the UCharUtilsImpl class. - * - * This is a singleton - do not use in a threaded program. - */ -class UCharUtils { - private: - - /** - * Our pointer to all sorts of goodness. - */ - static UCharUtilsImpl *instance_; - public: - - /** - * Initialize the utilities. Primarily opens the utf-8 converter. - * Calling this is required prior to using the converter. - * - * @return FwCode::FwOk on success, FwCode::UcnvOpenFailed on - * failure. - */ - static FwCode::ResponseCode init(); - - /** - * Release all resources. init() must be called again - * in order to use again. - */ - static void close(); - - /** - * Small wrapper to hide multi-line thoth api inside single-line call. - * - * @param value string to be tested for utf-8-ness - * @return true if it is utf-8, false if not - */ - static bool isUTF8(const std::string& value); - - /** - * Small wrapper to hide multi-line thoth api inside single-line call. - * - * @param value char string to be tested for utf-8-ness - * @param value_len length of value - * @return true if it is utf-8, false if not - */ - static bool isUTF8(const char * value, size_t value_len); - - /** - * Convert utf-8 strings into UChar strings. Note that the - * result is an internal reusable buffer so the caller should - * *not* release it. - * @param input utf-8 string to convert - * @param len set to length of output string - * @return NULL if anything bad happens, otherwise an allocated UChar * - * the caller must *NEVER* free this pointer. - */ - static UChar * getUChar(const std::string &input, int32_t& len); - - /** - * Do a NFC normalization so that different yet equivalent strings - * will have a single representation. See - * http://www.unicode.org/unicode/reports/tr15/ - * for more information. - * @param input A UTF-8 string that we want to normalize - * @param result (output) the normalized UTF-8 string - * @return FwCode::FwOk on success, - * FwCode::FwError on conversion failure, - * FwCode::InvalidData if input was not utf-8 - */ - static FwCode::ResponseCode normalize(const std::string &input, - std::string &result); - - /** - * Compile a regular expression in a unicode-friendly way. - * - * @param pattern the regexp pattern to compile. Assumed to - * be utf-8. - * @param result (output) Set to point to the compiled regexp. - * Must be released by the caller via uregex_close() when - * finished with it. - * @return FwCode::FwOk if compilation succeeded, - * FwCode::CompileRegExFailed or FwCode::ConvertToUCharFailed - * on failure. - */ - static FwCode::ResponseCode parseRegExpPattern - (const std::string &pattern, - URegularExpression * & result /* out */); - -}; - -/** - * Bug 2574599 - Impl exposed for use by multiple threads; singleton not - * appropriate for multi-threaded program. - */ -class UCharUtilsImpl -{ -private: - UConverter *uconv_; - -public: - UCharUtilsImpl(); - ~UCharUtilsImpl(); - - FwCode::ResponseCode init(); - void reset(); - FwCode::ResponseCode convert(const std::string &input, int32_t &len); - - FwCode::ResponseCode normalize(const std::string &nput, std::string &result); - - // Buffer used to convert from UTF-* into UChar - int32_t ucBuffLen; - UChar *ucBuff; - - // Buffer used for UChar normalization output - int32_t ucNormBuffLen; - UChar *ucNormBuff; - - // Buffer used to convert UChars back to UTF-8 - int32_t charBuffLen; - char *charBuff; -}; - -#endif // _DHT_UCHAR_UTILS_ diff --git a/hello.cpp b/hello.cpp deleted file mode 100644 index 118fccb..0000000 --- a/hello.cpp +++ /dev/null @@ -1,48 +0,0 @@ - -#include -#include -#include -#include - -typedef unsigned char uchar; -typedef struct datatuple -{ - - typedef byte* key_t; - typedef byte* data_t; - uint32_t keylen; - uint32_t datalen; - key_t key; - data_t data; - - -}; - -int main(int argc, char** argv) { - -bool * m1 = new bool(false); -std::cout << *m1 << std::endl; - - datatuple t; - std::cout << "size of datatuple:\t" << sizeof(datatuple) << std::endl; - - t.key = (datatuple::key_t) malloc(10); - const char * str = "12345678"; - strcpy((char*)t.key, (str)); - - t.keylen = strlen((char*)t.key); - - t.data = (datatuple::data_t) malloc(10); - const char * str2 = "1234567"; - strcpy((char*)t.data, (str2)); - - t.datalen = strlen((char*)t.data); - - std::cout << "size of datatuple:\t" << sizeof(datatuple) << std::endl; - std::cout << "keylen:\t" << t.keylen << - "\tdatalen:\t" << t.datalen << - "\t" << t.key << - "\t" << t.data << - std::endl; - -} diff --git a/logserver_pers.cpp b/logserver_pers.cpp deleted file mode 100644 index 4c7f2bb..0000000 --- a/logserver_pers.cpp +++ /dev/null @@ -1,519 +0,0 @@ - - - -#include "logserver.h" -#include "datatuple.h" - -#include "logstore.h" - -#include -#include -#include -#include -#include -#include -#include - -#undef begin -#undef end -#undef try - - -//server codes -uint8_t logserver::OP_SUCCESS = 1; -uint8_t logserver::OP_FAIL = 2; -uint8_t logserver::OP_SENDING_TUPLE = 3; - -//client codes -uint8_t logserver::OP_FIND = 4; -uint8_t logserver::OP_INSERT = 5; - -uint8_t logserver::OP_DONE = 6; - -uint8_t logserver::OP_INVALID = 32; - -void *serverLoop(void *args); - -void logserver::startserver(logtable *ltable) -{ - sys_alive = true; - this->ltable = ltable; - - selcond = new pthread_cond_t; - pthread_cond_init(selcond, 0); - - //initialize threads - for(int i=0; ith_handle = new pthread_t; - struct pthread_data *worker_data = new pthread_data; - worker_th->data = worker_data; - - worker_data->idleth_queue = &idleth_queue; - worker_data->ready_queue = &ready_queue; - worker_data->work_queue = &work_queue; - - worker_data->qlock = qlock; - - worker_data->selcond = selcond; - - worker_data->th_cond = new pthread_cond_t; - pthread_cond_init(worker_data->th_cond,0); - - worker_data->th_mut = new pthread_mutex_t; - pthread_mutex_init(worker_data->th_mut,0); - - worker_data->workitem = new int; - *(worker_data->workitem) = -1; - - worker_data->table_lock = lsmlock; - - worker_data->ltable = ltable; - - worker_data->sys_alive = &sys_alive; - - pthread_create(worker_th->th_handle, 0, thread_work_fn, worker_th); - - idleth_queue.push(*worker_th); - - - } - - - - //start server socket - sdata = new serverth_data; - sdata->server_socket = &serversocket; - sdata->server_port = server_port; - sdata->idleth_queue = &idleth_queue; - sdata->ready_queue = &ready_queue; - sdata->selcond = selcond; - sdata->qlock = qlock; - - pthread_create(&server_thread, 0, serverLoop, sdata); - - //start monitoring loop - eventLoop(); - -} - -void logserver::stopserver() -{ - //close the server socket - //stops receiving data on the server socket - shutdown(serversocket, 0); - - //wait for all threads to be idle - while(idleth_queue.size() != nthreads) - sleep(1); - - //set the system running flag to false - sys_alive = false; - for(int i=0; idata->th_mut); - pthread_cond_signal(idle_th->data->th_cond); - pthread_mutex_unlock(idle_th->data->th_mut); - //wait for it to join - pthread_join(*(idle_th->th_handle), 0); - //free the thread variables - pthread_cond_destroy(idle_th->data->th_cond); - delete idle_th->data->th_cond; - delete idle_th->data->th_mut; - delete idle_th->data->workitem; - delete idle_th->data; - delete idle_th->th_handle; - } - - th_list.clear(); - - //close(serversocket); - - return; -} - -void logserver::eventLoop() -{ - - fd_set readfs; - std::vector sel_list; - - int maxfd; - - struct timeval Timeout; - struct timespec ts; - - while(true) - { - //clear readset - FD_ZERO(&readfs); - maxfd = -1; - - ts.tv_nsec = 250000; //nanosec - ts.tv_sec = 0; - - //Timeout.tv_usec = 250; /* microseconds */ - //Timeout.tv_sec = 0; /* seconds */ - - //update select set - pthread_mutex_lock(qlock); - - while(ready_queue.size() == 0) - { - pthread_cond_wait(selcond, qlock); - //pthread_cond_timedwait(selcond, qlock, &ts); - //printf("awoke\n"); - } - - //new connections + processed conns are in ready_queue - //add them to select list - while(ready_queue.size() > 0) - { - sel_list.push_back(ready_queue.front()); - ready_queue.pop(); - } - pthread_mutex_unlock(qlock); - - //ready select set - for(std::vector::const_iterator itr=sel_list.begin(); - itr != sel_list.end(); itr++) - { - if(maxfd < *itr) - maxfd = *itr; - FD_SET(*itr, &readfs); - } - - //select events - int sel_res = select(maxfd+1, &readfs, NULL, NULL, NULL);// &Timeout); - //printf("sel_res %d %d\n", sel_res, errno); - //fflush(stdout); - //job assignment to threads - - for(int i=0; i 0) //assign the job to an indle thread - { - pthread_item idle_th = idleth_queue.front(); - idleth_queue.pop(); - - //wake up the thread to do work - pthread_mutex_lock(idle_th.data->th_mut); - //set the job of the idle thread - *(idle_th.data->workitem) = currsock; - pthread_cond_signal(idle_th.data->th_cond); - pthread_mutex_unlock(idle_th.data->th_mut); - } - else - { - //insert the given element to the work queue - work_queue.push(currsock); - printf("work queue size:\t%d\n", work_queue.size()); - } - - //remove from the sel_list - sel_list.erase(sel_list.begin()+i); - i--; - - pthread_mutex_unlock(qlock); - - } - } - } - -} - -void *serverLoop(void *args) -{ - - serverth_data *sdata = (serverth_data*)args; - - int sockfd; //socket descriptor - struct sockaddr_in serv_addr; - struct sockaddr_in cli_addr; - int newsockfd; //newly created - socklen_t clilen = sizeof(cli_addr); - - - //open a socket - sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (sockfd < 0) - { - printf("ERROR opening socket\n"); - return 0; - } - - bzero((char *) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); - serv_addr.sin_port = htons(sdata->server_port); - - if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) - { - printf("ERROR on binding.\n"); - return 0; - } - - //start listening on the server socket - //second arg is the max number of coonections waiting in queue - if(listen(sockfd,SOMAXCONN)==-1) - { - printf("ERROR on listen.\n"); - return 0; - } - - printf("LSM Server listenning...\n"); - - *(sdata->server_socket) = sockfd; - int flag, result; - while(true) - { - newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen); - if (newsockfd < 0) - { - printf("ERROR on accept.\n"); - return 0; // we probably want to continue instead of return here (when not debugging) - } - - flag = 1; - result = setsockopt(newsockfd, /* socket affected */ - IPPROTO_TCP, /* set option at TCP level */ - TCP_NODELAY, /* name of option */ - (char *) &flag, /* the cast is historical - cruft */ - sizeof(int)); /* length of option value */ - if (result < 0) - { - printf("ERROR on setting socket option TCP_NODELAY.\n"); - return 0; - } - - char clientip[20]; - inet_ntop(AF_INET, (void*) &(cli_addr.sin_addr), clientip, 20); - printf("Connection from:\t%s\n", clientip); - - //printf("Number of idle threads %d\n", idleth_queue.size()); - - pthread_mutex_lock(sdata->qlock); - - //insert the given element to the ready queue - sdata->ready_queue->push(newsockfd); - - if(sdata->ready_queue->size() == 1) //signal the event loop - pthread_cond_signal(sdata->selcond); - - pthread_mutex_unlock(sdata->qlock); - - } - - -} - -inline void readfromsocket(int sockd, byte *buf, int count) -{ - - int n = 0; - while( n < count ) - { - n += read( sockd, buf + n, count - n); - } - -} - -inline void writetosocket(int sockd, byte *buf, int count) -{ - int n = 0; - while( n < count ) - { - n += write( sockd, buf + n, count - n); - } -} - - - - - -void * thread_work_fn( void * args) -{ - pthread_item * item = (pthread_item *) args; - - pthread_mutex_lock(item->data->th_mut); - while(true) - { - while(*(item->data->workitem) == -1) - { - if(!*(item->data->sys_alive)) - break; - pthread_cond_wait(item->data->th_cond, item->data->th_mut); //wait for job - } - - - if(!*(item->data->sys_alive)) - { - //printf("thread quitted.\n"); - break; - } - - //step 1: read the opcode - uint8_t opcode; - ssize_t n = read(*(item->data->workitem), &opcode, sizeof(uint8_t)); - assert( n == sizeof(uint8_t)); - assert( opcode < logserver::OP_INVALID ); - - if( opcode == logserver::OP_DONE ) //close the conn on failure - { - pthread_mutex_lock(item->data->qlock); - printf("client done. conn closed. (%d, %d, %d, %d)\n", - n, errno, *(item->data->workitem), item->data->work_queue->size()); - close(*(item->data->workitem)); - - if(item->data->work_queue->size() > 0) - { - int new_work = item->data->work_queue->front(); - item->data->work_queue->pop(); - printf("work queue size:\t%d\n", item->data->work_queue->size()); - *(item->data->workitem) = new_work; - } - else - { - //set work to -1 - *(item->data->workitem) = -1; - //add self to idle queue - item->data->idleth_queue->push(*item); - } - - pthread_cond_signal(item->data->selcond); - - pthread_mutex_unlock(item->data->qlock); - continue; - } - - - //step 2: read the tuple from client - datatuple tuple; - tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t)); - tuple.datalen = (uint32_t*)malloc(sizeof(uint32_t)); - - //read the key length - n = read(*(item->data->workitem), tuple.keylen, sizeof(uint32_t)); - assert( n == sizeof(uint32_t)); - //read the data length - n = read(*(item->data->workitem), tuple.datalen, sizeof(uint32_t)); - assert( n == sizeof(uint32_t)); - - //read the key - tuple.key = (byte*) malloc(*tuple.keylen); - readfromsocket(*(item->data->workitem), (byte*) tuple.key, *tuple.keylen); - //read the data - if(!tuple.isDelete() && opcode != logserver::OP_FIND) - { - tuple.data = (byte*) malloc(*tuple.datalen); - readfromsocket(*(item->data->workitem), (byte*) tuple.data, *tuple.datalen); - } - else - tuple.data = 0; - - //step 3: process the tuple - //pthread_mutex_lock(item->data->table_lock); - //readlock(item->data->table_lock,0); - - if(opcode == logserver::OP_INSERT) - { - //insert/update/delete - item->data->ltable->insertTuple(tuple); - //unlock the lsmlock - //pthread_mutex_unlock(item->data->table_lock); - //unlock(item->data->table_lock); - //step 4: send response - uint8_t rcode = logserver::OP_SUCCESS; - n = write(*(item->data->workitem), &rcode, sizeof(uint8_t)); - assert(n == sizeof(uint8_t)); - - } - else if(opcode == logserver::OP_FIND) - { - //find the tuple - datatuple *dt = item->data->ltable->findTuple(-1, tuple.key, *tuple.keylen); - //unlock the lsmlock - //pthread_mutex_unlock(item->data->table_lock); - //unlock(item->data->table_lock); - - if(dt == 0) //tuple deleted - { - dt = (datatuple*) malloc(sizeof(datatuple)); - dt->keylen = (uint32_t*) malloc(2*sizeof(uint32_t) + *tuple.keylen); - *dt->keylen = *tuple.keylen; - dt->datalen = dt->keylen + 1; - dt->key = (datatuple::key_t) (dt->datalen+1); - memcpy((byte*) dt->key, (byte*) tuple.key, *tuple.keylen); - dt->setDelete(); - } - - //send the reply code - uint8_t rcode = logserver::OP_SENDING_TUPLE; - n = write(*(item->data->workitem), &rcode, sizeof(uint8_t)); - assert(n == sizeof(uint8_t)); - - //send the tuple - writetosocket(*(item->data->workitem), (byte*) dt->keylen, dt->byte_length()); - - //free datatuple - free(dt->keylen); - free(dt); - } - - //close the socket - //close(*(item->data->workitem)); - - //free the tuple - free(tuple.keylen); - free(tuple.datalen); - free(tuple.key); - free(tuple.data); - - //printf("socket %d: work completed.\n", *(item->data->workitem)); - - pthread_mutex_lock(item->data->qlock); - - //add conn desc to ready queue - item->data->ready_queue->push(*(item->data->workitem)); - //printf("ready queue size: %d sock(%d)\n", item->data->ready_queue->size(), *(item->data->workitem)); - if(item->data->ready_queue->size() == 1) //signal the event loop - pthread_cond_signal(item->data->selcond); - - if(item->data->work_queue->size() > 0) - { - int new_work = item->data->work_queue->front(); - item->data->work_queue->pop(); - printf("work queue size:\t%d\n", item->data->work_queue->size()); - *(item->data->workitem) = new_work; - } - else - { - //set work to -1 - *(item->data->workitem) = -1; - //add self to idle queue - item->data->idleth_queue->push(*item); - } - - pthread_mutex_unlock(item->data->qlock); - - } - pthread_mutex_unlock(item->data->th_mut); - - -} - - diff --git a/logserver_pers.h b/logserver_pers.h deleted file mode 100644 index 94a10b7..0000000 --- a/logserver_pers.h +++ /dev/null @@ -1,163 +0,0 @@ -#ifndef _LOGSERVER_H_ -#define _LOGSERVER_H_ - - -#include -#include - -//#include "logstore.h" - -#include "datatuple.h" - - - -#include -#include - -#undef begin -#undef try -#undef end - -class logtable; - - - -struct pthread_item; - -struct pthread_data { - std::queue *idleth_queue; - std::queue *ready_queue; - std::queue *work_queue; - pthread_mutex_t * qlock; - - pthread_cond_t *selcond; - - pthread_cond_t * th_cond; - pthread_mutex_t * th_mut; - - int *workitem; //id of the socket to work - - //pthread_mutex_t * table_lock; - rwl *table_lock; - logtable *ltable; - bool *sys_alive; -}; - -struct pthread_item{ - pthread_t * th_handle; - pthread_data *data; -}; - - -//struct work_item -//{ -// int sockd; //socket id -// datatuple in_tuple; //request -// datatuple out_tuple; //response -//}; - -struct serverth_data -{ - int *server_socket; - int server_port; - std::queue *idleth_queue; - std::queue *ready_queue; - - pthread_cond_t *selcond; - - pthread_mutex_t *qlock; - - - -}; - -void * thread_work_fn( void *); - -class logserver -{ -public: - //server codes - static uint8_t OP_SUCCESS; - static uint8_t OP_FAIL; - static uint8_t OP_SENDING_TUPLE; - - //client codes - static uint8_t OP_FIND; - static uint8_t OP_INSERT; - - static uint8_t OP_DONE; - - static uint8_t OP_INVALID; - -public: - logserver(int nthreads, int server_port){ - this->nthreads = nthreads; - this->server_port = server_port; - //lsmlock = new pthread_mutex_t; - //pthread_mutex_init(lsmlock,0); - - lsmlock = initlock(); - - qlock = new pthread_mutex_t; - pthread_mutex_init(qlock,0); - - ltable = 0; - - } - - ~logserver() - { - //delete lsmlock; - deletelock(lsmlock); - delete qlock; - } - - void startserver(logtable *ltable); - - void stopserver(); - - -public: - -private: - - //main loop of server - //accept connections, assign jobs to threads - //void dispatchLoop(); - - void eventLoop(); - - -private: - - int server_port; - - int nthreads; - - bool sys_alive; - - int serversocket; //server socket file descriptor - - //ccqueue conn_queue; //list of active connections (socket list) - - //ccqueue idleth_queue; //list of idle threads - - std::queue ready_queue; //connections to go inside select - std::queue work_queue; //connections to be processed by worker threads - std::queue idleth_queue; - pthread_mutex_t *qlock; - - pthread_t server_thread; - serverth_data *sdata; - pthread_cond_t *selcond; //server loop cond - - std::vector th_list; // list of threads - - rwl *lsmlock; //lock for using lsm table - - logtable *ltable; - -}; - - -#endif diff --git a/logserver_simple.cpp b/logserver_simple.cpp deleted file mode 100644 index 56f9ceb..0000000 --- a/logserver_simple.cpp +++ /dev/null @@ -1,409 +0,0 @@ - - - -#include "logserver.h" -#include "datatuple.h" - -#include "logstore.h" - -#include -#include -#include -#include -#include - -#undef begin -#undef end -#undef try - - -//server codes -uint8_t logserver::OP_SUCCESS = 1; -uint8_t logserver::OP_FAIL = 2; -uint8_t logserver::OP_SENDING_TUPLE = 3; - -//client codes -uint8_t logserver::OP_FIND = 4; -uint8_t logserver::OP_INSERT = 5; - -uint8_t logserver::OP_INVALID = 32; - - -void logserver::startserver(logtable *ltable) -{ - sys_alive = true; - this->ltable = ltable; - //initialize threads - for(int i=0; ith_handle = new pthread_t; - struct pthread_data *worker_data = new pthread_data; - worker_th->data = worker_data; - - worker_data->idleth_queue = &idleth_queue; - - worker_data->conn_queue = &conn_queue; - - worker_data->qlock = qlock; - - worker_data->th_cond = new pthread_cond_t; - pthread_cond_init(worker_data->th_cond,0); - - worker_data->th_mut = new pthread_mutex_t; - pthread_mutex_init(worker_data->th_mut,0); - - worker_data->workitem = new int; - *(worker_data->workitem) = -1; - - worker_data->table_lock = lsmlock; - - worker_data->ltable = ltable; - - worker_data->sys_alive = &sys_alive; - - pthread_create(worker_th->th_handle, 0, thread_work_fn, worker_th); - - idleth_queue.push(*worker_th); - - - } - - dispatchLoop(); - -} - -void logserver::stopserver() -{ - //close the server socket - //stops receiving data on the server socket - shutdown(serversocket, 0); - - //wait for all threads to be idle - while(idleth_queue.size() != nthreads) - sleep(1); - - //set the system running flag to false - sys_alive = false; - for(int i=0; idata->th_mut); - pthread_cond_signal(idle_th->data->th_cond); - pthread_mutex_unlock(idle_th->data->th_mut); - //wait for it to join - pthread_join(*(idle_th->th_handle), 0); - //free the thread variables - pthread_cond_destroy(idle_th->data->th_cond); - delete idle_th->data->th_cond; - delete idle_th->data->th_mut; - delete idle_th->data->workitem; - delete idle_th->data; - delete idle_th->th_handle; - } - - th_list.clear(); - - return; -} - -void logserver::dispatchLoop() -{ - - int sockfd; //socket descriptor - struct sockaddr_in serv_addr; - struct sockaddr_in cli_addr; - int newsockfd; //newly created - socklen_t clilen = sizeof(cli_addr); - - - //open a socket - sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (sockfd < 0) - { - printf("ERROR opening socket\n"); - return; - } - - bzero((char *) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_addr.s_addr = htonl(INADDR_ANY); - serv_addr.sin_port = htons(server_port); - - if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) - { - printf("ERROR on binding.\n"); - return; - } - - //start listening on the server socket - //second arg is the max number of coonections waiting in queue - if(listen(sockfd,SOMAXCONN)==-1) - { - printf("ERROR on listen.\n"); - return; - } - - printf("LSM Server listenning...\n"); - - serversocket = sockfd; - int flag, result; - while(true) - { - newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen); - if (newsockfd < 0) - { - printf("ERROR on accept.\n"); - return; // we probably want to continue instead of return here (when not debugging) - } - - flag = 1; - result = setsockopt(newsockfd, /* socket affected */ - IPPROTO_TCP, /* set option at TCP level */ - TCP_NODELAY, /* name of option */ - (char *) &flag, /* the cast is historical - cruft */ - sizeof(int)); /* length of option value */ - if (result < 0) - { - printf("ERROR on setting socket option TCP_NODELAY.\n"); - return; - } - - char clientip[20]; - inet_ntop(AF_INET, (void*) &(cli_addr.sin_addr), clientip, 20); - //printf("Connection from:\t%s\n", clientip); - - //printf("Number of idle threads %d\n", idleth_queue.size()); - - pthread_mutex_lock(qlock); - - if(idleth_queue.size() > 0) - { - pthread_item idle_th = idleth_queue.front(); - idleth_queue.pop(); - - //wake up the thread to do work - pthread_mutex_lock(idle_th.data->th_mut); - //set the job of the idle thread - *(idle_th.data->workitem) = newsockfd; - pthread_cond_signal(idle_th.data->th_cond); - pthread_mutex_unlock(idle_th.data->th_mut); - } - else - { - //insert the given element to the queue - conn_queue.push(newsockfd); - //printf("Number of queued connections:\t%d\n", conn_queue.size()); - } - - pthread_mutex_unlock(qlock); - - /* - try - { - - pthread_item idle_th = idleth_queue.pop(); - //wake up the thread to do work - pthread_mutex_lock(idle_th.data->th_mut); - //set the job of the idle thread - *(idle_th.data->workitem) = newsockfd; - pthread_cond_signal(idle_th.data->th_cond); - pthread_mutex_unlock(idle_th.data->th_mut); - - } - catch(int empty_exception) - { - //insert the given element to the queue - conn_queue.push(newsockfd); - //printf("Number of queued connections:\t%d\n", conn_queue.size()); - } - */ - } - - -} - -inline void readfromsocket(int sockd, byte *buf, int count) -{ - - int n = 0; - while( n < count ) - { - n += read( sockd, buf + n, count - n); - } - -} - -inline void writetosocket(int sockd, byte *buf, int count) -{ - int n = 0; - while( n < count ) - { - n += write( sockd, buf + n, count - n); - } -} - - - - - -void * thread_work_fn( void * args) -{ - pthread_item * item = (pthread_item *) args; - - pthread_mutex_lock(item->data->th_mut); - while(true) - { - while(*(item->data->workitem) == -1) - { - if(!*(item->data->sys_alive)) - break; - pthread_cond_wait(item->data->th_cond, item->data->th_mut); //wait for job - } - - - if(!*(item->data->sys_alive)) - { - //printf("thread quitted.\n"); - break; - } - - //step 1: read the opcode - uint8_t opcode; - ssize_t n = read(*(item->data->workitem), &opcode, sizeof(uint8_t)); - assert( n == sizeof(uint8_t)); - assert( opcode < logserver::OP_INVALID ); - - //step 2: read the tuple from client - datatuple tuple; - tuple.keylen = (uint32_t*)malloc(sizeof(uint32_t)); - tuple.datalen = (uint32_t*)malloc(sizeof(uint32_t)); - - //read the key length - n = read(*(item->data->workitem), tuple.keylen, sizeof(uint32_t)); - assert( n == sizeof(uint32_t)); - //read the data length - n = read(*(item->data->workitem), tuple.datalen, sizeof(uint32_t)); - assert( n == sizeof(uint32_t)); - - //read the key - tuple.key = (byte*) malloc(*tuple.keylen); - readfromsocket(*(item->data->workitem), (byte*) tuple.key, *tuple.keylen); - //read the data - if(!tuple.isDelete() && opcode != logserver::OP_FIND) - { - tuple.data = (byte*) malloc(*tuple.datalen); - readfromsocket(*(item->data->workitem), (byte*) tuple.data, *tuple.datalen); - } - else - tuple.data = 0; - - //step 3: process the tuple - //pthread_mutex_lock(item->data->table_lock); - //readlock(item->data->table_lock,0); - - if(opcode == logserver::OP_INSERT) - { - //insert/update/delete - item->data->ltable->insertTuple(tuple); - //unlock the lsmlock - //pthread_mutex_unlock(item->data->table_lock); - //unlock(item->data->table_lock); - //step 4: send response - uint8_t rcode = logserver::OP_SUCCESS; - n = write(*(item->data->workitem), &rcode, sizeof(uint8_t)); - assert(n == sizeof(uint8_t)); - - } - else if(opcode == logserver::OP_FIND) - { - //find the tuple - datatuple *dt = item->data->ltable->findTuple(-1, tuple.key, *tuple.keylen); - //unlock the lsmlock - //pthread_mutex_unlock(item->data->table_lock); - //unlock(item->data->table_lock); - - if(dt == 0) //tuple deleted - { - dt = (datatuple*) malloc(sizeof(datatuple)); - dt->keylen = (uint32_t*) malloc(2*sizeof(uint32_t) + *tuple.keylen); - *dt->keylen = *tuple.keylen; - dt->datalen = dt->keylen + 1; - dt->key = (datatuple::key_t) (dt->datalen+1); - memcpy((byte*) dt->key, (byte*) tuple.key, *tuple.keylen); - dt->setDelete(); - } - - //send the reply code - uint8_t rcode = logserver::OP_SENDING_TUPLE; - n = write(*(item->data->workitem), &rcode, sizeof(uint8_t)); - assert(n == sizeof(uint8_t)); - - //send the tuple - writetosocket(*(item->data->workitem), (byte*) dt->keylen, dt->byte_length()); - - //free datatuple - free(dt->keylen); - free(dt); - } - - //close the socket - close(*(item->data->workitem)); - - //free the tuple - free(tuple.keylen); - free(tuple.datalen); - free(tuple.key); - free(tuple.data); - - //printf("socket %d: work completed.\n", *(item->data->workitem)); - - pthread_mutex_lock(item->data->qlock); - - if(item->data->conn_queue->size() > 0) - { - int new_work = item->data->conn_queue->front(); - item->data->conn_queue->pop(); - *(item->data->workitem) = new_work; - } - else - { - //set work to -1 - *(item->data->workitem) = -1; - //add self to idle queue - item->data->idleth_queue->push(*item); - } - - pthread_mutex_unlock(item->data->qlock); - - /* - //check if there is new work this thread can do - try - { - int new_work = item->data->conn_queue->pop(); - *(item->data->workitem) = new_work; //set new work - //printf("socket %d: new work found.\n", *(item->data->workitem)); - } - catch(int empty_exception) - { - //printf("socket %d: no new work found.\n", *(item->data->workitem)); - //set work to -1 - *(item->data->workitem) = -1; - //add self to idle queue - item->data->idleth_queue->push(*item); - - } - */ - - } - pthread_mutex_unlock(item->data->th_mut); - - -} - - diff --git a/logserver_simple.h b/logserver_simple.h deleted file mode 100644 index 48fbea6..0000000 --- a/logserver_simple.h +++ /dev/null @@ -1,198 +0,0 @@ -#ifndef _LOGSERVER_H_ -#define _LOGSERVER_H_ - - -#include -#include - -//#include "logstore.h" - -#include "datatuple.h" - - - -#include -#include - -#undef begin -#undef try -#undef end - -class logtable; - -template -class ccqueue -{ -public: - ccqueue() - { - qmut = new pthread_mutex_t; - pthread_mutex_init(qmut,0); - } - - int size() - { - pthread_mutex_lock(qmut); - int qsize = m_queue.size(); - pthread_mutex_unlock(qmut); - return qsize; - } - - //inserts a copy of the given element to the queue - void push(const T &item) - { - pthread_mutex_lock(qmut); - m_queue.push(item); - pthread_mutex_unlock(qmut); - return; - } - - //returns a copy of the next element - //deletes the copy in the queue - //throws an exception with -1 on empty queue - T pop() throw (int) - { - pthread_mutex_lock(qmut); - - if(m_queue.size() > 0) - { - T item = m_queue.front(); - m_queue.pop(); - pthread_mutex_unlock(qmut); - return item; - } - - - pthread_mutex_unlock(qmut); - throw(-1); - - - } - - - - ~ccqueue() - { - delete qmut; - } - -private: - - std::queue m_queue; - - pthread_mutex_t *qmut; - -}; - -struct pthread_item; - -struct pthread_data { - std::queue *idleth_queue; - std::queue *conn_queue; - pthread_mutex_t * qlock; - - pthread_cond_t * th_cond; - pthread_mutex_t * th_mut; - - int *workitem; //id of the socket to work - - //pthread_mutex_t * table_lock; - rwl *table_lock; - logtable *ltable; - bool *sys_alive; -}; - -struct pthread_item{ - pthread_t * th_handle; - pthread_data *data; -}; - -struct work_item -{ - int sockd; //socket id - datatuple in_tuple; //request - datatuple out_tuple; //response -}; - - -void * thread_work_fn( void *); - -class logserver -{ -public: - //server codes - static uint8_t OP_SUCCESS; - static uint8_t OP_FAIL; - static uint8_t OP_SENDING_TUPLE; - - //client codes - static uint8_t OP_FIND; - static uint8_t OP_INSERT; - - static uint8_t OP_INVALID; - -public: - logserver(int nthreads, int server_port){ - this->nthreads = nthreads; - this->server_port = server_port; - //lsmlock = new pthread_mutex_t; - //pthread_mutex_init(lsmlock,0); - - lsmlock = initlock(); - - qlock = new pthread_mutex_t; - pthread_mutex_init(qlock,0); - - ltable = 0; - - } - - ~logserver() - { - //delete lsmlock; - deletelock(lsmlock); - delete qlock; - } - - void startserver(logtable *ltable); - - void stopserver(); - - -public: - -private: - - //main loop of server - //accept connections, assign jobs to threads - void dispatchLoop(); - - -private: - - int server_port; - - int nthreads; - - bool sys_alive; - - int serversocket; //server socket file descriptor - - //ccqueue conn_queue; //list of active connections (socket list) - - //ccqueue idleth_queue; //list of idle threads - - std::queue conn_queue; - std::queue idleth_queue; - pthread_mutex_t *qlock; - - std::vector th_list; // list of threads - - rwl *lsmlock; //lock for using lsm table - - logtable *ltable; - -}; - - -#endif diff --git a/check_server.cpp b/server.cpp similarity index 100% rename from check_server.cpp rename to server.cpp