diff --git a/c_src/bdberl_drv.c b/c_src/bdberl_drv.c index 82ef185..d9e4acb 100644 --- a/c_src/bdberl_drv.c +++ b/c_src/bdberl_drv.c @@ -528,12 +528,14 @@ static int bdberl_drv_control(ErlDrvData handle, unsigned int cmd, RETURN_INT(ERROR_TXN_OPEN, outbuf); } - // Inbuf is <> - unsigned flags = UNPACK_INT(inbuf, 0); + // Setup async command and schedule it on the txns threadpool + d->async_op = cmd; + d->async_flags = UNPACK_INT(inbuf, 0); + d->async_pool = G_TPOOL_TXNS; + bdberl_tpool_run(d->async_pool, &do_async_txnop, d, 0, &d->async_job); // Outbuf is <> - int rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), flags); - RETURN_INT(rc, outbuf); + RETURN_INT(0, outbuf); } case CMD_TXN_COMMIT: case CMD_TXN_ABORT: @@ -1172,20 +1174,23 @@ static void do_async_txnop(void* arg) { PortData* d = (PortData*)arg; - // Execute the actual commit/abort + // Execute the actual begin/commit/abort int rc = 0; - if (d->async_op == CMD_TXN_COMMIT) + if (d->async_op == CMD_TXN_BEGIN) + { + rc = G_DB_ENV->txn_begin(G_DB_ENV, 0, &(d->txn), d->async_flags); + } + else if (d->async_op == CMD_TXN_COMMIT) { rc = d->txn->commit(d->txn, d->async_flags); + d->txn = 0; } else { rc = d->txn->abort(d->txn); + d->txn = 0; } - // The transaction is now invalid, regardless of the outcome. - d->txn = 0; - async_cleanup_and_send_rc(d, rc); } diff --git a/src/bdberl.erl b/src/bdberl.erl index 1323032..2a73fe5 100644 --- a/src/bdberl.erl +++ b/src/bdberl.erl @@ -326,11 +326,17 @@ txn_begin(Opts) -> Cmd = <>, <> = erlang:port_control(get_port(), ?CMD_TXN_BEGIN, Cmd), case decode_rc(Result) of - ok -> ok; - Error -> {error, Error} + ok -> + receive + ok -> ok; + {error, Reason} -> {error, decode_rc(Reason)} + end; + Error -> + {error, Error} end. + %%-------------------------------------------------------------------- %% @doc %% Commit a transaction with default options.