Multiple changes.

- Alter the behavior of #clear, so it doesn't destroy collection
   environments, but just empties them.
 - Add #drop, which explictly -does- destroy a collection environment.
 - Run all cursor methods through rb_protect, to ensure proper
   cursor cleanup in the event of an exception mid iteration.
 - Fix the block form of collections to support multiple scopes.
 - Switching to a collection now automatically creates its environment.
 - Add include? and has_key?, for presence checks without allocating
   value memory or requiring deserialization.

FossilOrigin-Name: e1ed7bf613981607bb3b57ce7dd3e58b94ea3046e140b6dc37440da8d2909f94
This commit is contained in:
Mahlon E. Smith 2021-04-08 22:27:47 +00:00
parent 1e04b12efa
commit ca34f9fdc5
7 changed files with 365 additions and 114 deletions

View file

@ -6,7 +6,10 @@
*/
#define UNWRAP_DB( val, db ) \
rmdbx_db_t *db; \
TypedData_Get_Struct( val, rmdbx_db_t, &rmdbx_db_data, db );
TypedData_Get_Struct( val, rmdbx_db_t, &rmdbx_db_data, db )
#define CHECK_HANDLE \
if ( ! db->state.open ) rb_raise( rmdbx_eDatabaseError, "Closed database." )
VALUE rmdbx_cDatabase;
@ -160,7 +163,7 @@ rmdbx_open_env( VALUE self )
void
rmdbx_open_cursor( rmdbx_db_t *db )
{
if ( ! db->state.open ) rb_raise( rmdbx_eDatabaseError, "Closed database." );
CHECK_HANDLE;
if ( ! db->txn ) rb_raise( rmdbx_eDatabaseError, "No snapshot or transaction currently open." );
int rc = mdbx_cursor_open( db->txn, db->dbi, &db->cursor );
@ -215,11 +218,11 @@ rmdbx_close_txn( rmdbx_db_t *db, int txnflag )
{
if ( ! db->txn || db->state.retain_txn > -1 ) return;
switch ( txnflag ) {
case RMDBX_TXN_COMMIT:
mdbx_txn_commit( db->txn );
default:
mdbx_txn_abort( db->txn );
if ( txnflag == RMDBX_TXN_COMMIT ) {
mdbx_txn_commit( db->txn );
}
else {
mdbx_txn_abort( db->txn );
}
db->txn = 0;
@ -239,6 +242,7 @@ VALUE
rmdbx_rb_opentxn( VALUE self, VALUE mode )
{
UNWRAP_DB( self, db );
CHECK_HANDLE;
rmdbx_open_txn( db, RTEST(mode) ? MDBX_TXN_READWRITE : MDBX_TXN_RDONLY );
db->state.retain_txn = RTEST(mode) ? 1 : 0;
@ -273,13 +277,52 @@ rmdbx_rb_closetxn( VALUE self, VALUE write )
*
* Empty the current collection on disk. If collections are not enabled
* or the database handle is set to the top-level (main) db - this
* deletes *all records* from the database. This is not recoverable!
* deletes *all records* from the database.
*/
VALUE
rmdbx_clear( VALUE self )
{
UNWRAP_DB( self, db );
rmdbx_open_txn( db, MDBX_TXN_READWRITE );
int rc = mdbx_drop( db->txn, db->dbi, false );
if ( rc != MDBX_SUCCESS )
rb_raise( rmdbx_eDatabaseError, "mdbx_drop: (%d) %s", rc, mdbx_strerror(rc) );
rmdbx_close_txn( db, RMDBX_TXN_COMMIT );
return Qnil;
}
/*
* call-seq:
* db.drop( collection ) -> db
*
* Destroy a collection. You must be in the top level database to call
* this method.
*/
VALUE
rmdbx_drop( VALUE self, VALUE name )
{
UNWRAP_DB( self, db );
/* Provide a friendlier error message if max_collections is 0. */
if ( db->settings.max_collections == 0 )
rb_raise( rmdbx_eDatabaseError, "Unable to drop collection: collections are not enabled." );
/* All transactions must be closed when dropping a database. */
if ( db->txn )
rb_raise( rmdbx_eDatabaseError, "Unable to drop collection: transaction open" );
/* A drop can only be performed from the top-level database. */
if ( db->subdb != NULL )
rb_raise( rmdbx_eDatabaseError, "Unable to drop collection: switch to top-level db first" );
name = rb_funcall( name, rb_intern("to_s"), 0 );
db->subdb = StringValueCStr( name );
rmdbx_open_txn( db, MDBX_TXN_READWRITE );
int rc = mdbx_drop( db->txn, db->dbi, true );
@ -288,10 +331,10 @@ rmdbx_clear( VALUE self )
rmdbx_close_txn( db, RMDBX_TXN_COMMIT );
/* Refresh the environment handles. */
rmdbx_open_env( self );
/* Reset the current collection to the top level. */
db->subdb = NULL;
return Qnil;
return self;
}
@ -348,6 +391,26 @@ rmdbx_deserialize( VALUE self, VALUE val )
}
/*
* Enumerate over keys for the current collection.
*/
VALUE
rmdbx_each_key_i( VALUE self )
{
UNWRAP_DB( self, db );
MDBX_val key, data;
if ( mdbx_cursor_get( db->cursor, &key, &data, MDBX_FIRST ) == MDBX_SUCCESS ) {
rb_yield( rb_str_new( key.iov_base, key.iov_len ) );
while ( mdbx_cursor_get( db->cursor, &key, &data, MDBX_NEXT ) == MDBX_SUCCESS ) {
rb_yield( rb_str_new( key.iov_base, key.iov_len ) );
}
}
return self;
}
/* call-seq:
* db.each_key {|key| block } => self
*
@ -358,20 +421,41 @@ VALUE
rmdbx_each_key( VALUE self )
{
UNWRAP_DB( self, db );
MDBX_val key, data;
int state;
CHECK_HANDLE;
rmdbx_open_cursor( db );
RETURN_ENUMERATOR( self, 0, 0 );
if ( mdbx_cursor_get( db->cursor, &key, &data, MDBX_FIRST ) == MDBX_SUCCESS ) {
rb_yield( rb_str_new( key.iov_base, key.iov_len ) );
while ( mdbx_cursor_get( db->cursor, &key, &data, MDBX_NEXT ) == MDBX_SUCCESS ) {
rb_yield( rb_str_new( key.iov_base, key.iov_len ) );
}
}
rb_protect( rmdbx_each_key_i, self, &state );
mdbx_cursor_close( db->cursor );
db->cursor = NULL;
if ( state ) rb_jump_tag( state );
return self;
}
/* Enumerate over values for the current collection.
*/
VALUE
rmdbx_each_value_i( VALUE self )
{
UNWRAP_DB( self, db );
MDBX_val key, data;
if ( mdbx_cursor_get( db->cursor, &key, &data, MDBX_FIRST ) == MDBX_SUCCESS ) {
VALUE rv = rb_str_new( data.iov_base, data.iov_len );
rb_yield( rmdbx_deserialize( self, rv ) );
while ( mdbx_cursor_get( db->cursor, &key, &data, MDBX_NEXT ) == MDBX_SUCCESS ) {
rv = rb_str_new( data.iov_base, data.iov_len );
rb_yield( rmdbx_deserialize( self, rv ) );
}
}
return self;
}
@ -386,23 +470,43 @@ VALUE
rmdbx_each_value( VALUE self )
{
UNWRAP_DB( self, db );
MDBX_val key, data;
int state;
CHECK_HANDLE;
rmdbx_open_cursor( db );
RETURN_ENUMERATOR( self, 0, 0 );
if ( mdbx_cursor_get( db->cursor, &key, &data, MDBX_FIRST ) == MDBX_SUCCESS ) {
VALUE rv = rb_str_new( data.iov_base, data.iov_len );
rb_yield( rmdbx_deserialize( self, rv ) );
while ( mdbx_cursor_get( db->cursor, &key, &data, MDBX_NEXT ) == MDBX_SUCCESS ) {
rv = rb_str_new( data.iov_base, data.iov_len );
rb_yield( rmdbx_deserialize( self, rv ) );
}
}
rb_protect( rmdbx_each_value_i, self, &state );
mdbx_cursor_close( db->cursor );
db->cursor = NULL;
if ( state ) rb_jump_tag( state );
return self;
}
/* Enumerate over key and value pairs for the current collection.
*/
VALUE
rmdbx_each_pair_i( VALUE self )
{
UNWRAP_DB( self, db );
MDBX_val key, data;
if ( mdbx_cursor_get( db->cursor, &key, &data, MDBX_FIRST ) == MDBX_SUCCESS ) {
VALUE rkey = rb_str_new( key.iov_base, key.iov_len );
VALUE rval = rb_str_new( data.iov_base, data.iov_len );
rb_yield( rb_assoc_new( rkey, rmdbx_deserialize( self, rval ) ) );
while ( mdbx_cursor_get( db->cursor, &key, &data, MDBX_NEXT ) == MDBX_SUCCESS ) {
rkey = rb_str_new( key.iov_base, key.iov_len );
rval = rb_str_new( data.iov_base, data.iov_len );
rb_yield( rb_assoc_new( rkey, rmdbx_deserialize( self, rval ) ) );
}
}
return self;
}
@ -417,29 +521,24 @@ VALUE
rmdbx_each_pair( VALUE self )
{
UNWRAP_DB( self, db );
MDBX_val key, data;
int state;
CHECK_HANDLE;
rmdbx_open_cursor( db );
RETURN_ENUMERATOR( self, 0, 0 );
if ( mdbx_cursor_get( db->cursor, &key, &data, MDBX_FIRST ) == MDBX_SUCCESS ) {
VALUE rkey = rb_str_new( key.iov_base, key.iov_len );
VALUE rval = rb_str_new( data.iov_base, data.iov_len );
rb_yield( rb_assoc_new( rkey, rmdbx_deserialize( self, rval ) ) );
while ( mdbx_cursor_get( db->cursor, &key, &data, MDBX_NEXT ) == MDBX_SUCCESS ) {
rkey = rb_str_new( key.iov_base, key.iov_len );
rval = rb_str_new( data.iov_base, data.iov_len );
rb_yield( rb_assoc_new( rkey, rmdbx_deserialize( self, rval ) ) );
}
}
rb_protect( rmdbx_each_pair_i, self, &state );
mdbx_cursor_close( db->cursor );
db->cursor = NULL;
if ( state ) rb_jump_tag( state );
return self;
}
/* call-seq:
* db.length -> Integer
*
@ -451,7 +550,7 @@ rmdbx_length( VALUE self )
UNWRAP_DB( self, db );
MDBX_stat mstat;
if ( ! db->state.open ) rb_raise( rmdbx_eDatabaseError, "Closed database." );
CHECK_HANDLE;
rmdbx_open_txn( db, MDBX_TXN_RDONLY );
int rc = mdbx_dbi_stat( db->txn, db->dbi, &mstat, sizeof(mstat) );
@ -465,6 +564,39 @@ rmdbx_length( VALUE self )
}
/* call-seq:
* db.include?( 'key' ) => bool
*
* Returns true if the current collection contains +key+.
*/
VALUE
rmdbx_include( VALUE self, VALUE key )
{
int rc;
UNWRAP_DB( self, db );
CHECK_HANDLE;
rmdbx_open_txn( db, MDBX_TXN_RDONLY );
MDBX_val ckey = rmdbx_key_for( key );
MDBX_val data;
rc = mdbx_get( db->txn, db->dbi, &ckey, &data );
rmdbx_close_txn( db, RMDBX_TXN_ROLLBACK );
switch ( rc ) {
case MDBX_SUCCESS:
return Qtrue;
case MDBX_NOTFOUND:
return Qfalse;
default:
rmdbx_close( self );
rb_raise( rmdbx_eDatabaseError, "Unable to fetch key: (%d) %s", rc, mdbx_strerror(rc) );
}
}
/* call-seq:
* db[ 'key' ] => value
*
@ -476,7 +608,7 @@ rmdbx_get_val( VALUE self, VALUE key )
int rc;
UNWRAP_DB( self, db );
if ( ! db->state.open ) rb_raise( rmdbx_eDatabaseError, "Closed database." );
CHECK_HANDLE;
rmdbx_open_txn( db, MDBX_TXN_RDONLY );
MDBX_val ckey = rmdbx_key_for( key );
@ -503,7 +635,8 @@ rmdbx_get_val( VALUE self, VALUE key )
/* call-seq:
* db[ 'key' ] = value
*
* Set a single value for +key+.
* Set a single value for +key+. If the value is +nil+, the
* key is removed.
*/
VALUE
rmdbx_put_val( VALUE self, VALUE key, VALUE val )
@ -511,7 +644,7 @@ rmdbx_put_val( VALUE self, VALUE key, VALUE val )
int rc;
UNWRAP_DB( self, db );
if ( ! db->state.open ) rb_raise( rmdbx_eDatabaseError, "Closed database." );
CHECK_HANDLE;
rmdbx_open_txn( db, MDBX_TXN_READWRITE );
MDBX_val ckey = rmdbx_key_for( key );
@ -552,41 +685,32 @@ VALUE
rmdbx_stats( VALUE self )
{
UNWRAP_DB( self, db );
if ( ! db->state.open ) rb_raise( rmdbx_eDatabaseError, "Closed database." );
CHECK_HANDLE;
return rmdbx_gather_stats( db );
}
/*
* call-seq:
* db.collection -> (collection name, or nil if in main)
* db.collection( 'collection_name' ) -> db
* db.collection( nil ) -> db (main)
*
* Gets or sets the sub-database "collection" that read/write
* operations apply to.
* Passing +nil+ sets the database to the main, top-level namespace.
* If a block is passed, the collection automatically reverts to the
* prior collection when it exits.
*
* db.collection( 'collection_name' ) do
* [ ... ]
* end # reverts to the previous collection name
* Return the currently selected collection, or +nil+ if at the
* top-level.
*/
VALUE
rmdbx_get_subdb( VALUE self )
{
UNWRAP_DB( self, db );
return ( db->subdb == NULL ) ? Qnil : rb_str_new_cstr( db->subdb );
}
/*
* Sets the current collection name for read/write operations.
*
*/
VALUE
rmdbx_set_subdb( int argc, VALUE *argv, VALUE self )
rmdbx_set_subdb( VALUE self, VALUE name )
{
UNWRAP_DB( self, db );
VALUE subdb, block;
char *prev_db = NULL;
rb_scan_args( argc, argv, "01&", &subdb, &block );
if ( argc == 0 ) {
if ( db->subdb == NULL ) return Qnil;
return rb_str_new_cstr( db->subdb );
}
/* Provide a friendlier error message if max_collections is 0. */
if ( db->settings.max_collections == 0 )
@ -596,38 +720,14 @@ rmdbx_set_subdb( int argc, VALUE *argv, VALUE self )
if ( db->txn )
rb_raise( rmdbx_eDatabaseError, "Unable to change collection: transaction open" );
/* Retain the prior database collection if a block was passed.
*/
if ( rb_block_given_p() && db->subdb != NULL ) {
prev_db = (char *) malloc( strlen(db->subdb) + 1 );
strcpy( prev_db, db->subdb );
}
db->subdb = NIL_P( name ) ? NULL : StringValueCStr( name );
if ( NIL_P(subdb) ) {
db->subdb = NULL;
}
else {
subdb = rb_funcall( subdb, rb_intern("to_s"), 0 );
db->subdb = StringValueCStr( subdb );
}
/* Reset the db handle and issue a single transaction to reify
the collection.
*/
rmdbx_close_dbi( db );
/*
FIXME: Immediate transaction write to auto-create new env?
Fetching from here at the moment causes an error if you
haven't written anything to the new collection yet.
*/
/* Revert to the previous collection after the block is done.
*/
if ( rb_block_given_p() ) {
rb_yield( self );
if ( db->subdb != prev_db ) {
db->subdb = prev_db;
rmdbx_close_dbi( db );
}
xfree( prev_db );
}
rmdbx_open_txn( db, MDBX_TXN_READWRITE );
rmdbx_close_txn( db, RMDBX_TXN_COMMIT );
return self;
}
@ -767,16 +867,18 @@ rmdbx_init_database()
rb_define_protected_method( rmdbx_cDatabase, "initialize", rmdbx_database_initialize, -1 );
rb_define_protected_method( rmdbx_cDatabase, "initialize_copy", rmdbx_init_copy, 1 );
rb_define_method( rmdbx_cDatabase, "collection", rmdbx_set_subdb, -1 );
rb_define_method( rmdbx_cDatabase, "close", rmdbx_close, 0 );
rb_define_method( rmdbx_cDatabase, "reopen", rmdbx_open_env, 0 );
rb_define_method( rmdbx_cDatabase, "closed?", rmdbx_closed_p, 0 );
rb_define_method( rmdbx_cDatabase, "in_transaction?", rmdbx_in_transaction_p, 0 );
rb_define_method( rmdbx_cDatabase, "clear", rmdbx_clear, 0 );
rb_define_method( rmdbx_cDatabase, "close", rmdbx_close, 0 );
rb_define_method( rmdbx_cDatabase, "closed?", rmdbx_closed_p, 0 );
rb_define_method( rmdbx_cDatabase, "drop", rmdbx_drop, 1 );
rb_define_method( rmdbx_cDatabase, "each_key", rmdbx_each_key, 0 );
rb_define_method( rmdbx_cDatabase, "each_value", rmdbx_each_value, 0 );
rb_define_method( rmdbx_cDatabase, "each_pair", rmdbx_each_pair, 0 );
rb_define_method( rmdbx_cDatabase, "each_value", rmdbx_each_value, 0 );
rb_define_method( rmdbx_cDatabase, "in_transaction?", rmdbx_in_transaction_p, 0 );
rb_define_method( rmdbx_cDatabase, "include?", rmdbx_include, 1 );
rb_define_method( rmdbx_cDatabase, "length", rmdbx_length, 0 );
rb_define_method( rmdbx_cDatabase, "reopen", rmdbx_open_env, 0 );
rb_define_method( rmdbx_cDatabase, "[]", rmdbx_get_val, 1 );
rb_define_method( rmdbx_cDatabase, "[]=", rmdbx_put_val, 2 );
@ -784,6 +886,10 @@ rmdbx_init_database()
rb_define_protected_method( rmdbx_cDatabase, "open_transaction", rmdbx_rb_opentxn, 1 );
rb_define_protected_method( rmdbx_cDatabase, "close_transaction", rmdbx_rb_closetxn, 1 );
/* Collection functions */
rb_define_protected_method( rmdbx_cDatabase, "get_subdb", rmdbx_get_subdb, 0 );
rb_define_protected_method( rmdbx_cDatabase, "set_subdb", rmdbx_set_subdb, 1 );
rb_define_protected_method( rmdbx_cDatabase, "raw_stats", rmdbx_stats, 0 );
rb_require( "mdbx/database" );