Checkpoint.

- Fix a couple of edge cases while switching between collections.
  - Raise error if attempting to switch collections mid-transaction.
  - Add logic for re-entrancy with long-running transactions.
  - Revert to the prior collection if passing a block to #collection.
  - Add a predicate to tell if you're currently within a long-running transaction.
  - Add separate commit/rollback to long-running transactions.

FossilOrigin-Name: 711239e6fc2f25479a26fb54805da1e5db792f97f28a8b5724e0b38eb11cdb07
This commit is contained in:
Mahlon E. Smith 2021-01-25 00:36:40 +00:00
parent ceb92fad16
commit 81ee69295c
7 changed files with 239 additions and 29 deletions

2
.pryrc
View file

@ -11,5 +11,5 @@ rescue Exception => e
e.backtrace.join( "\n\t" ) e.backtrace.join( "\n\t" )
end end
db = MDBX::Database.open( 'tmp/testdb', max_collections: 50 ) db = MDBX::Database.open( 'tmp/testdb', max_collections: 5 )

View file

@ -47,6 +47,20 @@ rmdbx_close_all( rmdbx_db_t *db )
} }
/*
* Close any open database handle. Will be automatically
* re-opened on next transaction. This is primarily useful for
* switching between subdatabases.
*/
void
rmdbx_close_dbi( rmdbx_db_t *db )
{
if ( ! db->dbi ) return;
mdbx_dbi_close( db->env, db->dbi );
db->dbi = 0;
}
/* /*
* Cleanup a previously allocated DB environment. * Cleanup a previously allocated DB environment.
*/ */
@ -86,6 +100,21 @@ rmdbx_closed_p( VALUE self )
} }
/*
* call-seq:
* db.in_transaction? #=> false
*
* Predicate: return true if a transaction (or snapshot)
* is currently open.
*/
VALUE
rmdbx_in_transaction_p( VALUE self )
{
UNWRAP_DB( self, db );
return db->txn ? Qtrue : Qfalse;
}
/* /*
* Open the DB environment handle. * Open the DB environment handle.
*/ */
@ -156,17 +185,15 @@ rmdbx_open_txn( rmdbx_db_t *db, int rwflag )
/* /*
* Close any existing database transaction. If there is no * Close any existing database transaction. If there is no
* active transaction, this is a no-op. * active transaction, this is a no-op. If there is a long
* * running transaction open, this is a no-op.
* FIXME: this needs a conditional no-op for long running
* transactions, so callers don't have to care/check
* *
* +txnflag must either be RMDBX_TXN_ROLLBACK or RMDBX_TXN_COMMIT. * +txnflag must either be RMDBX_TXN_ROLLBACK or RMDBX_TXN_COMMIT.
*/ */
void void
rmdbx_close_txn( rmdbx_db_t *db, int txnflag ) rmdbx_close_txn( rmdbx_db_t *db, int txnflag )
{ {
if ( ! db->txn ) return; if ( ! db->txn || db->state.retain_txn > -1 ) return;
switch ( txnflag ) { switch ( txnflag ) {
case RMDBX_TXN_COMMIT: case RMDBX_TXN_COMMIT:
@ -180,6 +207,46 @@ rmdbx_close_txn( rmdbx_db_t *db, int txnflag )
} }
/*
* call-seq:
* db.open_transaction( mode )
*
* Open a new long-running transaction. If +mode+ is true,
* it is opened read/write.
*
*/
VALUE
rmdbx_rb_opentxn( VALUE self, VALUE mode )
{
UNWRAP_DB( self, db );
rmdbx_open_txn( db, RTEST(mode) ? MDBX_TXN_READWRITE : MDBX_TXN_RDONLY );
db->state.retain_txn = RTEST(mode) ? 1 : 0;
return Qtrue;
}
/*
* call-seq:
* db.close_transaction( mode )
*
* Close a long-running transaction. If +write+ is true,
* the transaction is committed. Otherwise, rolled back.
*
*/
VALUE
rmdbx_rb_closetxn( VALUE self, VALUE write )
{
UNWRAP_DB( self, db );
db->state.retain_txn = -1;
rmdbx_close_txn( db, RTEST(write) ? RMDBX_TXN_COMMIT : RMDBX_TXN_ROLLBACK );
return Qtrue;
}
/* /*
* call-seq: * call-seq:
* db.clear * db.clear
@ -382,39 +449,63 @@ rmdbx_stats( VALUE self )
/* /*
* call-seq: * Gets or sets the sub-database "collection" that read/write operations apply to.
* Passing +nil+ sets the database to the main, top-level namespace.
* If a block is passed, the collection automatically reverts to the
* prior collection when it exits.
*
* db.collection( 'collection_name' ) # => db * db.collection( 'collection_name' ) # => db
* db.collection( nil ) # => db (main) * db.collection( nil ) # => db (main)
* *
* Operate on a sub-database "collection". Passing +nil+ * db.collection( 'collection_name' ) do
* sets the database to the main, top-level namespace. * [ ... ]
* end #=> reverts to the previous collection name
* *
*/ */
VALUE VALUE
rmdbx_set_subdb( int argc, VALUE *argv, VALUE self ) rmdbx_set_subdb( int argc, VALUE *argv, VALUE self )
{ {
UNWRAP_DB( self, db ); UNWRAP_DB( self, db );
VALUE subdb; VALUE subdb, block;
char *prev_db = NULL;
rb_scan_args( argc, argv, "01", &subdb ); rb_scan_args( argc, argv, "01&", &subdb, &block );
if ( argc == 0 ) { if ( argc == 0 ) {
if ( db->subdb == NULL ) return Qnil; if ( db->subdb == NULL ) return Qnil;
return rb_str_new_cstr( db->subdb ); return rb_str_new_cstr( db->subdb );
} }
/* All transactions must be closed when switching database handles. */
if ( db->txn ) rb_raise( rmdbx_eDatabaseError, "Unable to change collection: finish current transaction" );
/* Retain the prior database collection if a
* block was passed. */
if ( rb_block_given_p() ) {
if ( db->subdb != NULL ) {
prev_db = (char *) malloc( strlen(db->subdb) + 1 );
strcpy( prev_db, db->subdb );
}
}
rb_iv_set( self, "@collection", subdb ); rb_iv_set( self, "@collection", subdb );
db->subdb = NIL_P( subdb ) ? NULL : StringValueCStr( subdb ); db->subdb = NIL_P( subdb ) ? NULL : StringValueCStr( subdb );
rmdbx_close_dbi( db );
/* Close any currently open dbi handle, to be re-opened with /*
* the new collection on next access.
*
FIXME: Immediate transaction write to auto-create new env? FIXME: Immediate transaction write to auto-create new env?
Fetching from here at the moment causes an error if you Fetching from here at the moment causes an error if you
haven't written anything yet. haven't written anything to the new collection yet.
*/ */
if ( db->dbi ) {
mdbx_dbi_close( db->env, db->dbi ); /* Revert to the previous collection after the block is done. */
db->dbi = 0; if ( rb_block_given_p() ) {
rb_yield( self );
if ( db->subdb != prev_db ) {
rb_iv_set( self, "@collection", prev_db ? rb_str_new_cstr(prev_db) : Qnil );
db->subdb = prev_db;
rmdbx_close_dbi( db );
}
free( prev_db );
} }
return self; return self;
@ -422,15 +513,14 @@ rmdbx_set_subdb( int argc, VALUE *argv, VALUE self )
/* /*
* call-seq: * Open an existing (or create a new) mdbx database at filesystem
* +path+. In block form, the database is automatically closed.
*
* MDBX::Database.open( path ) -> db * MDBX::Database.open( path ) -> db
* MDBX::Database.open( path, options ) -> db * MDBX::Database.open( path, options ) -> db
* MDBX::Database.open( path, options ) do |db| * MDBX::Database.open( path, options ) do |db|
* db... * db...
* end * end
*
* Open an existing (or create a new) mdbx database at filesystem
* +path+. In block form, the database is automatically closed.
* *
*/ */
VALUE VALUE
@ -458,7 +548,8 @@ rmdbx_database_initialize( int argc, VALUE *argv, VALUE self )
db->cursor = NULL; db->cursor = NULL;
db->path = StringValueCStr( path ); db->path = StringValueCStr( path );
db->subdb = NULL; db->subdb = NULL;
db->state.open = 0; db->state.open = 0;
db->state.retain_txn = -1;
db->settings.env_flags = MDBX_ENV_DEFAULTS; db->settings.env_flags = MDBX_ENV_DEFAULTS;
db->settings.mode = 0644; db->settings.mode = 0644;
db->settings.max_collections = 0; db->settings.max_collections = 0;
@ -530,11 +621,16 @@ rmdbx_init_database()
rb_define_method( rmdbx_cDatabase, "close", rmdbx_close, 0 ); rb_define_method( rmdbx_cDatabase, "close", rmdbx_close, 0 );
rb_define_method( rmdbx_cDatabase, "reopen", rmdbx_open_env, 0 ); rb_define_method( rmdbx_cDatabase, "reopen", rmdbx_open_env, 0 );
rb_define_method( rmdbx_cDatabase, "closed?", rmdbx_closed_p, 0 ); rb_define_method( rmdbx_cDatabase, "closed?", rmdbx_closed_p, 0 );
rb_define_method( rmdbx_cDatabase, "in_transaction?", rmdbx_in_transaction_p, 0 );
rb_define_method( rmdbx_cDatabase, "clear", rmdbx_clear, 0 ); rb_define_method( rmdbx_cDatabase, "clear", rmdbx_clear, 0 );
rb_define_method( rmdbx_cDatabase, "keys", rmdbx_keys, 0 ); rb_define_method( rmdbx_cDatabase, "keys", rmdbx_keys, 0 );
rb_define_method( rmdbx_cDatabase, "[]", rmdbx_get_val, 1 ); rb_define_method( rmdbx_cDatabase, "[]", rmdbx_get_val, 1 );
rb_define_method( rmdbx_cDatabase, "[]=", rmdbx_put_val, 2 ); rb_define_method( rmdbx_cDatabase, "[]=", rmdbx_put_val, 2 );
/* Manually open/close transactions from ruby. */
rb_define_protected_method( rmdbx_cDatabase, "open_transaction", rmdbx_rb_opentxn, 1 );
rb_define_protected_method( rmdbx_cDatabase, "close_transaction", rmdbx_rb_closetxn, 1 );
rb_define_protected_method( rmdbx_cDatabase, "raw_stats", rmdbx_stats, 0 ); rb_define_protected_method( rmdbx_cDatabase, "raw_stats", rmdbx_stats, 0 );
rb_require( "mdbx/database" ); rb_require( "mdbx/database" );

View file

@ -31,6 +31,7 @@ struct rmdbx_db {
struct { struct {
int open; int open;
int retain_txn;
} state; } state;
char *path; char *path;

View file

@ -84,7 +84,7 @@ rmdbx_gather_environment_stats(
INT2NUM(menvinfo.mi_recent_txnid) ); INT2NUM(menvinfo.mi_recent_txnid) );
rb_hash_aset( environ, ID2SYM(rb_intern("last_reader_txnid")), rb_hash_aset( environ, ID2SYM(rb_intern("last_reader_txnid")),
INT2NUM(menvinfo.mi_latter_reader_txnid) ); INT2NUM(menvinfo.mi_latter_reader_txnid) );
rb_hash_aset( environ, ID2SYM(rb_intern("maximum_readers")), rb_hash_aset( environ, ID2SYM(rb_intern("max_readers")),
INT2NUM(menvinfo.mi_maxreaders) ); INT2NUM(menvinfo.mi_maxreaders) );
rb_hash_aset( environ, ID2SYM(rb_intern("readers_in_use")), rb_hash_aset( environ, ID2SYM(rb_intern("readers_in_use")),
INT2NUM(menvinfo.mi_numreaders) ); INT2NUM(menvinfo.mi_numreaders) );
@ -183,8 +183,7 @@ rmdbx_gather_stats( rmdbx_db_t *db )
rmdbx_gather_environment_stats( stat, mstat, menvinfo ); rmdbx_gather_environment_stats( stat, mstat, menvinfo );
rmdbx_gather_reader_stats( db, stat, mstat, menvinfo ); rmdbx_gather_reader_stats( db, stat, mstat, menvinfo );
/* database and subdatabases */ /* TODO: database and subdatabase stats */
return stat; return stat;
} }

View file

@ -18,8 +18,6 @@ class MDBX::Database
### db[ 'key' ] #=> value ### db[ 'key' ] #=> value
### end ### end
### ###
### FIXME: document all options!
###
def self::open( *args, &block ) def self::open( *args, &block )
db = new( *args ) db = new( *args )
@ -49,9 +47,11 @@ class MDBX::Database
attr_reader :path attr_reader :path
# A Proc for automatically serializing values. # A Proc for automatically serializing values.
# Defaults to +Marshal.dump+.
attr_accessor :serializer attr_accessor :serializer
# A Proc for automatically deserializing values. # A Proc for automatically deserializing values.
# Defaults to +Marshal.load+.
attr_accessor :deserializer attr_accessor :deserializer
@ -65,6 +65,56 @@ class MDBX::Database
alias_method :namespace, :collection alias_method :namespace, :collection
### Open a new mdbx read/write transaction. In block form,
### the transaction is automatically committed.
###
### Raising a MDBX::Rollback exception from within the block
### automatically rolls the transaction back.
###
def transaction( commit: true, &block )
self.open_transaction( commit )
yield self if block_given?
return self
rescue MDBX::Rollback
commit = false
self.rollback
rescue
commit = false
self.rollback
raise
ensure
if block_given?
commit ? self.commit : self.rollback
end
end
### Open a new mdbx read only snapshot. In block form,
### the snapshot is automatically closed.
###
def snapshot( &block )
self.transaction( commit: false, &block )
end
### Close any open transactions, abandoning all changes.
###
def rollback
return self.close_transaction( false )
end
alias_method :abort, :rollback
### Close any open transactions, writing all changes.
###
def commit
return self.close_transaction( true )
end
alias_method :save, :commit
### Return a hash of various metadata for the current database. ### Return a hash of various metadata for the current database.
### ###
def statistics def statistics

View file

@ -1,4 +1,5 @@
#!/usr/bin/env rspec -cfd #!/usr/bin/env rspec -cfd
# vim: set nosta noet ts=4 sw=4 ft=ruby:
require_relative '../lib/helper' require_relative '../lib/helper'
@ -113,6 +114,12 @@ RSpec.describe( MDBX::Database ) do
expect( db['key'] ).to be_truthy expect( db['key'] ).to be_truthy
end end
it "revert back to the previous collection when used in a block" do
expect( db.collection ).to be_nil
db.collection( 'bucket' ) { 'no-op' }
expect( db.collection ).to be_nil
end
it "can be cleared of contents" do it "can be cleared of contents" do
db.collection( 'bucket' ) db.collection( 'bucket' )
10.times {|i| db[i] = true } 10.times {|i| db[i] = true }

57
spec/mdbx/stats_spec.rb Normal file
View file

@ -0,0 +1,57 @@
#!/usr/bin/env rspec -cfd
# vim: set nosta noet ts=4 sw=4 ft=ruby:
require_relative '../lib/helper'
RSpec.fdescribe( MDBX::Database ) do
let!( :db ) { described_class.open( TEST_DATABASE.to_s, max_readers: 500 ) }
let( :stats ) { db.statistics }
after( :each ) do
db.close
end
it "returns the configured max_readers" do
expect( stats.dig(:environment, :max_readers) ).to be >= 500
end
it "returns compile time flags and options" do
build = stats[ :build ]
expect( build.keys.size ).to be( 4 )
expect( build.keys ).to include( :compiler, :flags, :options, :target )
expect( build[:compiler] ).to be_a( String )
expect( build[:flags] ).to be_a( String )
expect( build[:target] ).to be_a( String )
expect( build[:options] ).to be_a( Hash )
end
end
__END__
{:environment=>
{:pagesize=>4096,
:last_txnid=>125,
:last_reader_txnid=>125,
:maximum_readers=>122,
:readers_in_use=>1,
:datafile=>
{:size_current=>65536,
:pages=>16,
:type=>"dynamic",
:size_lower=>12288,
:size_upper=>1048576,
:growth_step=>65536,
:shrink_threshold=>131072}},
:readers=>
[{:slot=>0,
:pid=>45436,
:thread=>34374651904,
:txnid=>0,
:lag=>0,
:bytes_used=>0,
:bytes_retired=>0}]}
}