libdb/test/csharp/ReplicationTest.cs
2012-11-14 16:35:20 -05:00

996 lines
31 KiB
C#

/*-
* See the file LICENSE for redistribution information.
*
* Copyright (c) 2009, 2012 Oracle and/or its affiliates. All rights reserved.
*
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.NetworkInformation;
using System.Text;
using System.Threading;
using System.Xml;
using NUnit.Framework;
using BerkeleyDB;
namespace CsharpAPITest
{
[TestFixture]
public class ReplicationTest : CSharpTestFixture
{
private EventWaitHandle clientStartSignal;
private EventWaitHandle masterCloseSignal;
private EventWaitHandle client1StartSignal;
private EventWaitHandle client2StartSignal;
private EventWaitHandle client3StartSignal;
private EventWaitHandle clientsElectionSignal;
private EventWaitHandle masterLeaveSignal;
private uint startUpDone;
private bool electionDone;
List<uint> ports = new List<uint>();
[TestFixtureSetUp]
public void SetUp()
{
testFixtureName = "ReplicationTest";
base.SetUpTestfixture();
}
[Test]
public void TestRepMgrSite()
{
testName = "TestRepMgrSite";
SetUpTest(true);
string masterHome = testHome + "\\Master";
Configuration.ClearDir(masterHome);
string clientHome = testHome + "\\Client";
Configuration.ClearDir(clientHome);
ports.Clear();
AvailablePorts portGen = new AvailablePorts();
uint mPort = portGen.Current;
portGen.MoveNext();
uint cPort = portGen.Current;
// Open environment with replication configuration.
DatabaseEnvironmentConfig cfg =
new DatabaseEnvironmentConfig();
cfg.Create = true;
cfg.RunRecovery = true;
cfg.UseLocking = true;
cfg.UseLogging = true;
cfg.UseMPool = true;
cfg.UseReplication = true;
cfg.FreeThreaded = true;
cfg.UseTxns = true;
cfg.EventNotify = new EventNotifyDelegate(stuffHappened);
cfg.RepSystemCfg = new ReplicationConfig();
cfg.RepSystemCfg.RepmgrSitesConfig.Add(new DbSiteConfig());
cfg.RepSystemCfg.RepmgrSitesConfig[0].Host = "127.0.0.1";
cfg.RepSystemCfg.RepmgrSitesConfig[0].Port = mPort;
cfg.RepSystemCfg.RepmgrSitesConfig[0].LocalSite = true;
cfg.RepSystemCfg.RepmgrSitesConfig[0].GroupCreator = true;
cfg.RepSystemCfg.Priority = 100;
DatabaseEnvironment mEnv = DatabaseEnvironment.Open(
masterHome, cfg);
mEnv.DeadlockResolution = DeadlockPolicy.DEFAULT;
mEnv.RepMgrStartMaster(2);
cfg.RepSystemCfg.RepmgrSitesConfig[0].Port = cPort;
cfg.RepSystemCfg.RepmgrSitesConfig[0].GroupCreator = false;
cfg.RepSystemCfg.Priority = 10;
cfg.RepSystemCfg.RepmgrSitesConfig.Add(new DbSiteConfig());
cfg.RepSystemCfg.RepmgrSitesConfig[1].Host = "127.0.0.1";
cfg.RepSystemCfg.RepmgrSitesConfig[1].Port = mPort;
cfg.RepSystemCfg.RepmgrSitesConfig[1].Helper = true;
DatabaseEnvironment cEnv = DatabaseEnvironment.Open(
clientHome, cfg);
cEnv.RepMgrStartClient(2, false);
/* Wait for client to start up */
int i = 0;
while (!cEnv.ReplicationSystemStats().ClientStartupComplete) {
if (i < 20) {
Thread.Sleep(1000);
i++;
} else
throw new TestException();
}
/*
* Verify the client info could be observed by master's
* remote site.
*/
Assert.AreEqual(1, mEnv.RepMgrRemoteSites.Length);
RepMgrSite rsite = mEnv.RepMgrRemoteSites[0];
Assert.AreEqual("127.0.0.1", rsite.Address.Host);
Assert.AreEqual(cPort, rsite.Address.Port);
Assert.AreEqual(true, rsite.isConnected);
Assert.AreEqual(false, rsite.isPeer);
DbSite site = mEnv.RepMgrSite("127.0.0.1", mPort);
Assert.AreEqual("127.0.0.1", site.Address.Host);
Assert.AreEqual(mPort, site.Address.Port);
Assert.LessOrEqual(0, site.EId);
Assert.AreEqual(true, site.GroupCreator);
Assert.AreEqual(true, site.LocalSite);
Assert.AreEqual(false, site.Helper);
Assert.AreEqual(false, site.Legacy);
Assert.AreEqual(false, site.Peer);
site.Close();
site = mEnv.RepMgrSite("127.0.0.1", cPort);
Assert.AreEqual("127.0.0.1", site.Address.Host);
Assert.AreEqual(cPort, site.Address.Port);
Assert.AreEqual(rsite.EId, site.EId);
Assert.AreEqual(false, site.GroupCreator);
Assert.AreEqual(false, site.LocalSite);
Assert.AreEqual(false, site.Helper);
Assert.AreEqual(false, site.Legacy);
Assert.AreEqual(false, site.Peer);
site.Remove();
cEnv.Close();
mEnv.Close();
/*
* Update the repmgr site, and verify it is updated in
* unmanaged memory.
*/
rsite.Address = new ReplicationHostAddress(
"192.168.1.1", 1000);
rsite.EId = 1024;
rsite.isConnected = false;
rsite.isPeer = true;
Assert.AreEqual("192.168.1.1", rsite.Address.Host);
Assert.AreEqual(1000, rsite.Address.Port);
Assert.AreEqual(1024, rsite.EId);
Assert.AreEqual(false, rsite.isConnected);
Assert.AreEqual(true, rsite.isPeer);
}
[Test]
public void TestRepMgr()
{
testName = "TestRepMgr";
SetUpTest(true);
// Initialize ports for master and client.
ports.Clear();
AvailablePorts portGen = new AvailablePorts();
ports.Insert(0, portGen.Current);
portGen.MoveNext();
ports.Insert(1, portGen.Current);
clientStartSignal = new AutoResetEvent(false);
masterCloseSignal = new AutoResetEvent(false);
Thread thread1 = new Thread(new ThreadStart(Master));
Thread thread2 = new Thread(new ThreadStart(Client));
// Start master thread before client thread.
thread1.Start();
Thread.Sleep(1000);
thread2.Start();
thread2.Join();
thread1.Join();
clientStartSignal.Close();
masterCloseSignal.Close();
}
public void Master()
{
string home = testHome + "/Master";
string dbName = "rep.db";
Configuration.ClearDir(home);
/*
* Configure and open environment with replication
* application.
*/
DatabaseEnvironmentConfig cfg =
new DatabaseEnvironmentConfig();
cfg.UseReplication = true;
cfg.MPoolSystemCfg = new MPoolConfig();
cfg.MPoolSystemCfg.CacheSize =
new CacheInfo(0, 20485760, 1);
cfg.UseLocking = true;
cfg.UseTxns = true;
cfg.UseMPool = true;
cfg.Create = true;
cfg.UseLogging = true;
cfg.RunRecovery = true;
cfg.TxnNoSync = true;
cfg.FreeThreaded = true;
cfg.RepSystemCfg = new ReplicationConfig();
cfg.RepSystemCfg.RepmgrSitesConfig.Add(new DbSiteConfig());
cfg.RepSystemCfg.RepmgrSitesConfig[0].Host = "127.0.0.1";
cfg.RepSystemCfg.RepmgrSitesConfig[0].Port = ports[0];
cfg.RepSystemCfg.RepmgrSitesConfig[0].LocalSite = true;
cfg.RepSystemCfg.Priority = 100;
cfg.RepSystemCfg.BulkTransfer = true;
cfg.RepSystemCfg.AckTimeout = 2000;
cfg.RepSystemCfg.BulkTransfer = true;
cfg.RepSystemCfg.CheckpointDelay = 1500;
cfg.RepSystemCfg.Clockskew(102, 100);
cfg.RepSystemCfg.ConnectionRetry = 10;
cfg.RepSystemCfg.DelayClientSync = false;
cfg.RepSystemCfg.ElectionRetry = 5;
cfg.RepSystemCfg.ElectionTimeout = 3000;
cfg.RepSystemCfg.FullElectionTimeout = 5000;
cfg.RepSystemCfg.HeartbeatMonitor = 100;
cfg.RepSystemCfg.HeartbeatSend = 10;
cfg.RepSystemCfg.LeaseTimeout = 1300;
cfg.RepSystemCfg.AutoInit = true;
cfg.RepSystemCfg.NoBlocking = false;
cfg.RepSystemCfg.RepMgrAckPolicy =
AckPolicy.ALL_PEERS;
cfg.RepSystemCfg.RetransmissionRequest(10, 100);
cfg.RepSystemCfg.Strict2Site = true;
cfg.RepSystemCfg.UseMasterLeases = false;
cfg.EventNotify = new EventNotifyDelegate(stuffHappened);
DatabaseEnvironment env = DatabaseEnvironment.Open(
home, cfg);
// Get initial replication stats.
ReplicationStats repStats = env.ReplicationSystemStats();
env.PrintReplicationSystemStats();
Assert.AreEqual(100, repStats.EnvPriority);
Assert.AreEqual(1,
repStats.CurrentElectionGenerationNumber);
Assert.AreEqual(0, repStats.CurrentGenerationNumber);
Assert.AreEqual(0, repStats.AppliedTransactions);
Assert.AreEqual(0, repStats.ElectionDataGeneration);
// Start a master site with replication manager.
env.RepMgrStartMaster(3);
// Open a btree database and write some data.
Transaction txn = env.BeginTransaction();
BTreeDatabaseConfig dbConfig =
new BTreeDatabaseConfig();
dbConfig.Creation = CreatePolicy.IF_NEEDED;
dbConfig.Env = env;
dbConfig.PageSize = 512;
BTreeDatabase db = BTreeDatabase.Open(dbName,
dbConfig, txn);
txn.Commit();
txn = env.BeginTransaction();
for (int i = 0; i < 5; i++)
db.Put(new DatabaseEntry(BitConverter.GetBytes(i)),
new DatabaseEntry(BitConverter.GetBytes(i)), txn);
txn.Commit();
Console.WriteLine(
"Master: Finished initialization and data#1.");
// Client site could enter now.
clientStartSignal.Set();
Console.WriteLine(
"Master: Wait for Client to join and get #1.");
Console.WriteLine("...");
// Put some new data into master site.
txn = env.BeginTransaction();
for (int i = 10; i < 15; i++)
db.Put(new DatabaseEntry(BitConverter.GetBytes(i)),
new DatabaseEntry(BitConverter.GetBytes(i)),
txn);
txn.Commit();
Console.WriteLine(
"Master: Write something new, data #2.");
Console.WriteLine("Master: Wait for client to read #2...");
// Get the stats.
repStats = env.ReplicationSystemStats(true);
env.PrintReplicationSystemStats();
Assert.LessOrEqual(0, repStats.AppliedTransactions);
Assert.LessOrEqual(0, repStats.AwaitedLSN.LogFileNumber);
Assert.LessOrEqual(0, repStats.AwaitedLSN.Offset);
Assert.LessOrEqual(0, repStats.AwaitedPage);
Assert.LessOrEqual(0, repStats.BadGenerationMessages);
Assert.LessOrEqual(0, repStats.BulkBufferFills);
Assert.LessOrEqual(0, repStats.BulkBufferOverflows);
Assert.LessOrEqual(0, repStats.BulkBufferTransfers);
Assert.LessOrEqual(0, repStats.BulkRecordsStored);
Assert.LessOrEqual(0, repStats.ClientServiceRequests);
Assert.LessOrEqual(0, repStats.ClientServiceRequestsMissing);
Assert.IsInstanceOf(typeof(bool), repStats.ClientStartupComplete);
Assert.AreEqual(2, repStats.CurrentElectionGenerationNumber);
Assert.AreEqual(1, repStats.CurrentGenerationNumber);
Assert.LessOrEqual(0, repStats.CurrentQueuedLogRecords);
Assert.LessOrEqual(0, repStats.CurrentWinner);
Assert.LessOrEqual(0, repStats.CurrentWinnerMaxLSN.LogFileNumber);
Assert.LessOrEqual(0, repStats.CurrentWinnerMaxLSN.Offset);
Assert.LessOrEqual(0, repStats.DuplicateLogRecords);
Assert.LessOrEqual(0, repStats.DuplicatePages);
Assert.LessOrEqual(0, repStats.DupMasters);
Assert.LessOrEqual(0, repStats.ElectionGenerationNumber);
Assert.LessOrEqual(0, repStats.ElectionPriority);
Assert.LessOrEqual(0, repStats.Elections);
Assert.LessOrEqual(0, repStats.ElectionStatus);
Assert.LessOrEqual(0, repStats.ElectionsWon);
Assert.LessOrEqual(0, repStats.ElectionTiebreaker);
Assert.LessOrEqual(0, repStats.ElectionTimeSec);
Assert.LessOrEqual(0, repStats.ElectionTimeUSec);
Assert.AreEqual(repStats.EnvID, repStats.MasterEnvID);
Assert.LessOrEqual(0, repStats.EnvPriority);
Assert.LessOrEqual(0, repStats.FailedMessageSends);
Assert.LessOrEqual(0, repStats.ForcedRerequests);
Assert.LessOrEqual(0, repStats.IgnoredMessages);
Assert.LessOrEqual(0, repStats.MasterChanges);
Assert.LessOrEqual(0, repStats.MasterEnvID);
Assert.LessOrEqual(0, repStats.MaxLeaseSec);
Assert.LessOrEqual(0, repStats.MaxLeaseUSec);
Assert.LessOrEqual(0, repStats.MaxPermanentLSN.Offset);
Assert.LessOrEqual(0, repStats.MaxQueuedLogRecords);
Assert.LessOrEqual(0, repStats.MessagesSent);
Assert.LessOrEqual(0, repStats.MissedLogRecords);
Assert.LessOrEqual(0, repStats.MissedPages);
Assert.LessOrEqual(0, repStats.NewSiteMessages);
Assert.LessOrEqual(repStats.MaxPermanentLSN.LogFileNumber,
repStats.NextLSN.LogFileNumber);
if (repStats.MaxPermanentLSN.LogFileNumber ==
repStats.NextLSN.LogFileNumber)
Assert.Less(repStats.MaxPermanentLSN.Offset,
repStats.NextLSN.Offset);
Assert.LessOrEqual(0, repStats.NextPage);
Assert.LessOrEqual(0, repStats.Outdated);
Assert.LessOrEqual(0, repStats.QueuedLogRecords);
Assert.LessOrEqual(0, repStats.ReceivedLogRecords);
Assert.LessOrEqual(0, repStats.ReceivedMessages);
Assert.LessOrEqual(0, repStats.ReceivedPages);
Assert.LessOrEqual(0, repStats.RegisteredSites);
Assert.LessOrEqual(0, repStats.RegisteredSitesNeeded);
Assert.LessOrEqual(0, repStats.Sites);
Assert.LessOrEqual(0, repStats.StartSyncMessagesDelayed);
Assert.AreEqual(2, repStats.Status);
Assert.LessOrEqual(0, repStats.Throttled);
Assert.LessOrEqual(0, repStats.Votes);
// Get replication manager statistics.
RepMgrStats repMgrStats = env.RepMgrSystemStats(true);
Assert.LessOrEqual(0, repMgrStats.DroppedConnections);
Assert.LessOrEqual(0, repMgrStats.DroppedMessages);
Assert.LessOrEqual(0, repMgrStats.FailedConnections);
Assert.LessOrEqual(0, repMgrStats.FailedMessages);
Assert.LessOrEqual(0, repMgrStats.QueuedMessages);
// Print them out.
env.PrintRepMgrSystemStats();
// Wait until client has finished reading.
masterCloseSignal.WaitOne();
Console.WriteLine("Master: Leave as well.");
// Close all.
db.Close(false);
env.LogFlush();
env.Close();
}
public void Client()
{
string home = testHome + "/Client";
Configuration.ClearDir(home);
clientStartSignal.WaitOne();
Console.WriteLine("Client: Join the replication");
// Open a environment.
DatabaseEnvironmentConfig cfg =
new DatabaseEnvironmentConfig();
cfg.UseReplication = true;
cfg.MPoolSystemCfg = new MPoolConfig();
cfg.MPoolSystemCfg.CacheSize =
new CacheInfo(0, 20485760, 1);
cfg.UseLocking = true;
cfg.UseTxns = true;
cfg.UseMPool = true;
cfg.Create = true;
cfg.UseLogging = true;
cfg.RunRecovery = true;
cfg.TxnNoSync = true;
cfg.FreeThreaded = true;
cfg.LockTimeout = 50000;
cfg.RepSystemCfg = new ReplicationConfig();
cfg.RepSystemCfg.RepmgrSitesConfig.Add(new DbSiteConfig());
cfg.RepSystemCfg.RepmgrSitesConfig[0].Host = "127.0.0.1";
cfg.RepSystemCfg.RepmgrSitesConfig[0].Port = ports[1];
cfg.RepSystemCfg.RepmgrSitesConfig[0].LocalSite = true;
cfg.RepSystemCfg.Priority = 10;
cfg.RepSystemCfg.RepmgrSitesConfig.Add(new DbSiteConfig());
cfg.RepSystemCfg.RepmgrSitesConfig[1].Host = "127.0.0.1";
cfg.RepSystemCfg.RepmgrSitesConfig[1].Port = ports[0];
cfg.RepSystemCfg.RepmgrSitesConfig[1].Helper = true;
cfg.EventNotify = new EventNotifyDelegate(stuffHappened);
DatabaseEnvironment env = DatabaseEnvironment.Open(
home, cfg);
// Start a client site with replication manager.
env.RepMgrStartClient(3, false);
// Leave enough time to sync.
Thread.Sleep(20000);
// Open database.
BTreeDatabaseConfig dbConfig =
new BTreeDatabaseConfig();
dbConfig.Creation = CreatePolicy.NEVER;
dbConfig.AutoCommit = true;
dbConfig.Env = env;
dbConfig.PageSize = 512;
BTreeDatabase db = BTreeDatabase.Open("rep.db",
dbConfig);
// Write data into database.
Console.WriteLine("Client: Start reading data #1.");
for (int i = 0; i < 5; i++)
db.GetBoth(new DatabaseEntry(
BitConverter.GetBytes(i)), new DatabaseEntry(
BitConverter.GetBytes(i)));
// Leave sometime for client to read new data from master.
Thread.Sleep(20000);
/*
* Read the data. All data exists in master site should
* appear in the client site.
*/
Console.WriteLine("Client: Start reading data #2.");
for (int i = 10; i < 15; i++)
db.GetBoth(new DatabaseEntry(
BitConverter.GetBytes(i)), new DatabaseEntry(
BitConverter.GetBytes(i)));
// Get the latest replication subsystem statistics.
ReplicationStats repStats = env.ReplicationSystemStats();
Assert.IsTrue(repStats.ClientStartupComplete);
Assert.LessOrEqual(0, repStats.DuplicateLogRecords);
Assert.LessOrEqual(0, repStats.EnvID);
Assert.LessOrEqual(0, repStats.NextPage);
Assert.LessOrEqual(0, repStats.ReceivedPages);
Assert.AreEqual(1, repStats.Status);
// Close all.
db.Close(false);
env.LogFlush();
env.Close();
Console.WriteLine(
"Client: All data is read. Leaving the replication");
// The master is closed after client's close.
masterCloseSignal.Set();
}
private void stuffHappened(NotificationEvent eventCode, byte[] info)
{
switch (eventCode)
{
case NotificationEvent.REP_CLIENT:
Console.WriteLine("Event: CLIENT");
break;
case NotificationEvent.REP_CONNECT_BROKEN:
Console.WriteLine("Event: REP_CONNECT_BROKEN");
break;
case NotificationEvent.REP_CONNECT_ESTD:
Console.WriteLine("Event: REP_CONNECT_ESTD");
break;
case NotificationEvent.REP_CONNECT_TRY_FAILED:
Console.WriteLine("Event: REP_CONNECT_TRY_FAILED");
break;
case NotificationEvent.REP_MASTER:
Console.WriteLine("Event: MASTER");
break;
case NotificationEvent.REP_NEWMASTER:
electionDone = true;
Console.WriteLine("Event: NEWMASTER");
break;
case NotificationEvent.REP_LOCAL_SITE_REMOVED:
Console.WriteLine("Event: REP_LOCAL_SITE_REMOVED");
break;
case NotificationEvent.REP_SITE_ADDED:
Console.WriteLine("Event: REP_SITE_ADDED");
break;
case NotificationEvent.REP_SITE_REMOVED:
Console.WriteLine("Event: REP_SITE_REMOVED");
break;
case NotificationEvent.REP_STARTUPDONE:
startUpDone++;
Console.WriteLine("Event: REP_STARTUPDONE");
break;
case NotificationEvent.REP_PERM_FAILED:
Console.WriteLine("Event: Insufficient Acks.");
break;
default:
Console.WriteLine("Event: {0}", eventCode);
break;
}
}
[Test]
public void TestElection()
{
testName = "TestElection";
SetUpTest(true);
// Initialize ports for one master, and three clients.
ports.Clear();
AvailablePorts portGen = new AvailablePorts();
ports.Insert(0, portGen.Current);
portGen.MoveNext();
ports.Insert(1, portGen.Current);
portGen.MoveNext();
ports.Insert(2, portGen.Current);
portGen.MoveNext();
ports.Insert(3, portGen.Current);
/*
* The *Signals are used as triggers to control the test
* work flow. The client1StartSignal would be set once
* the master is ready and notify the client1 to start.
* The client2StartSignal would be set once the client1
* is ready and notify the client2 to start. The
* client3StartSignal is similar. The masterLeaveSignal
* would be set once the last client client3 is ready
* and notify the master to leave. The
* clientsElectionSignal would be set when the master
* has already left and new master would be elected.
*/
client1StartSignal = new AutoResetEvent(false);
client2StartSignal = new AutoResetEvent(false);
client3StartSignal = new AutoResetEvent(false);
clientsElectionSignal = new AutoResetEvent(false);
masterLeaveSignal = new AutoResetEvent(false);
// Count startup done event.
startUpDone = 0;
// Whether finish election.
electionDone = false;
Thread thread1 = new Thread(
new ThreadStart(UnstableMaster));
Thread thread2 = new Thread(
new ThreadStart(StableClient1));
Thread thread3 = new Thread(
new ThreadStart(StableClient2));
Thread thread4 = new Thread(
new ThreadStart(StableClient3));
try {
thread1.Start();
/*
* After start up is done at master, start
* client1. Wait for the signal for 50000 ms.
*/
if (!client1StartSignal.WaitOne(50000))
throw new TestException();
thread2.Start();
// Ready to start client2.
if (!client2StartSignal.WaitOne(50000))
throw new TestException();
thread3.Start();
// Ready to start client3.
if (!client3StartSignal.WaitOne(50000))
throw new TestException();
thread4.Start();
thread4.Join();
thread3.Join();
thread2.Join();
thread1.Join();
Assert.IsTrue(electionDone);
} catch (TestException e) {
if (thread1.IsAlive)
thread1.Abort();
if (thread2.IsAlive)
thread2.Abort();
if (thread3.IsAlive)
thread3.Abort();
if (thread4.IsAlive)
thread4.Abort();
throw e;
} finally {
client1StartSignal.Close();
client2StartSignal.Close();
client3StartSignal.Close();
clientsElectionSignal.Close();
masterLeaveSignal.Close();
}
}
public void UnstableMaster()
{
string home = testHome + "/UnstableMaster";
Configuration.ClearDir(home);
// Open environment with replication configuration.
DatabaseEnvironmentConfig cfg =
new DatabaseEnvironmentConfig();
cfg.UseReplication = true;
cfg.MPoolSystemCfg = new MPoolConfig();
cfg.MPoolSystemCfg.CacheSize =
new CacheInfo(0, 20485760, 1);
cfg.UseLocking = true;
cfg.UseTxns = true;
cfg.UseMPool = true;
cfg.Create = true;
cfg.UseLogging = true;
cfg.RunRecovery = true;
cfg.TxnNoSync = true;
cfg.FreeThreaded = true;
cfg.RepSystemCfg = new ReplicationConfig();
cfg.RepSystemCfg.RepmgrSitesConfig.Add(
new DbSiteConfig());
cfg.RepSystemCfg.RepmgrSitesConfig[0].Host = "127.0.0.1";
cfg.RepSystemCfg.RepmgrSitesConfig[0].Port = ports[0];
cfg.RepSystemCfg.RepmgrSitesConfig[0].LocalSite = true;
cfg.RepSystemCfg.Priority = 200;
cfg.RepSystemCfg.ElectionRetry = 10;
cfg.RepSystemCfg.RepMgrAckPolicy = AckPolicy.ALL;
cfg.EventNotify = new EventNotifyDelegate(stuffHappened);
DatabaseEnvironment env = DatabaseEnvironment.Open(
home, cfg);
env.DeadlockResolution = DeadlockPolicy.DEFAULT;
try {
// Start as master site.
env.RepMgrStartMaster(3);
Console.WriteLine(
"Master: Create a new repmgr group");
// Client1 could start now.
client1StartSignal.Set();
// Wait for initialization of all clients.
if (!masterLeaveSignal.WaitOne(50000))
throw new TestException();
// Check remote sites are valid.
foreach (RepMgrSite site in
env.RepMgrRemoteSites) {
Assert.AreEqual("127.0.0.1",
site.Address.Host);
Assert.IsTrue(ports.Contains(
site.Address.Port));
}
// Close the current master.
Console.WriteLine("Master: Unexpected leave.");
env.LogFlush();
} catch(Exception e) {
Console.WriteLine(e.Message);
} finally {
env.Close();
/*
* Clean up electionDone and startUpDone to
* check election for new master and start-up
* done on clients.
*/
electionDone = false;
startUpDone = 0;
/*
* Need to set signals for three times, each
* site would wait for one.
*/
for (int i = 0; i < 3; i++)
clientsElectionSignal.Set();
}
}
public void StableClient1()
{
StableClient(testHome + "/StableClient1", 1, ports[1],
10, ports[0], 0, client2StartSignal);
}
public void StableClient2()
{
StableClient(testHome + "/StableClient2", 2, ports[2],
20, ports[0], ports[3], client3StartSignal);
}
public void StableClient3()
{
StableClient(testHome + "/StableClient3", 3, ports[3],
80, ports[0], ports[2], masterLeaveSignal);
}
public void StableClient(string home, int clientIdx,
uint localPort, uint priority,
uint helperPort, uint peerPort,
EventWaitHandle notifyHandle)
{
int timeout = 0;
Configuration.ClearDir(home);
Console.WriteLine(
"Client{0}: Join the replication", clientIdx);
DatabaseEnvironmentConfig cfg =
new DatabaseEnvironmentConfig();
cfg.Create = true;
cfg.FreeThreaded = true;
cfg.MPoolSystemCfg = new MPoolConfig();
cfg.MPoolSystemCfg.CacheSize =
new CacheInfo(0, 20485760, 1);
cfg.RunRecovery = true;
cfg.TxnNoSync = true;
cfg.UseLocking = true;
cfg.UseMPool = true;
cfg.UseTxns = true;
cfg.UseReplication = true;
cfg.UseLogging = true;
cfg.RepSystemCfg = new ReplicationConfig();
cfg.RepSystemCfg.RepmgrSitesConfig.Add(
new DbSiteConfig());
cfg.RepSystemCfg.RepmgrSitesConfig[0].Host = "127.0.0.1";
cfg.RepSystemCfg.RepmgrSitesConfig[0].Port = localPort;
cfg.RepSystemCfg.RepmgrSitesConfig[0].LocalSite = true;
cfg.RepSystemCfg.Priority = priority;
cfg.RepSystemCfg.RepmgrSitesConfig.Add(
new DbSiteConfig());
cfg.RepSystemCfg.RepmgrSitesConfig[1].Host = "127.0.0.1";
cfg.RepSystemCfg.RepmgrSitesConfig[1].Port = helperPort;
cfg.RepSystemCfg.RepmgrSitesConfig[1].Helper = true;
cfg.RepSystemCfg.ElectionRetry = 100;
cfg.RepSystemCfg.RepMgrAckPolicy = AckPolicy.NONE;
cfg.EventNotify = new EventNotifyDelegate(stuffHappened);
DatabaseEnvironment env = DatabaseEnvironment.Open(
home, cfg);
env.DeadlockResolution = DeadlockPolicy.DEFAULT;
try {
// Start the client.
Assert.AreEqual(clientIdx - 1, startUpDone);
env.RepMgrStartClient(3, false);
while (startUpDone < clientIdx) {
if (timeout > 10)
throw new TestException();
timeout++;
Thread.Sleep(1000);
}
ReplicationStats repStats =
env.ReplicationSystemStats();
Assert.LessOrEqual(0, repStats.Elections);
Assert.LessOrEqual(0,
repStats.ElectionTiebreaker);
Assert.LessOrEqual(0, repStats.ElectionTimeSec +
repStats.ElectionTimeUSec);
Assert.LessOrEqual(0, repStats.MasterChanges);
Assert.LessOrEqual(0, repStats.NewSiteMessages);
Assert.LessOrEqual(0,
repStats.ReceivedLogRecords);
Assert.LessOrEqual(0, repStats.ReceivedMessages);
Assert.LessOrEqual(0, repStats.ReceivedPages);
Assert.GreaterOrEqual(clientIdx + 1,
repStats.RegisteredSitesNeeded);
Assert.LessOrEqual(0, repStats.Sites);
// Notify the next event.
notifyHandle.Set();
// Wait for master's leave.
if (!clientsElectionSignal.WaitOne(50000))
throw new TestException();
timeout = 0;
while (!electionDone) {
if (timeout > 10)
throw new TestException();
timeout++;
Thread.Sleep(1000);
}
env.LogFlush();
timeout = 0;
// Start up done event happens on client.
while (startUpDone < 2) {
if (timeout > 10)
throw new TestException();
timeout++;
Thread.Sleep(1000);
}
Thread.Sleep((int)priority * 100);
} catch (Exception e) {
Console.WriteLine(e.Message);
} finally {
env.Close();
Console.WriteLine(
"Client{0}: Leaving the replication",
clientIdx);
}
}
[Test]
public void TestAckPolicy()
{
testName = "TestAckPolicy";
SetUpTest(true);
SetRepMgrAckPolicy(testHome + "_ALL", AckPolicy.ALL);
SetRepMgrAckPolicy(testHome + "_ALL_AVAILABLE",
AckPolicy.ALL_AVAILABLE);
SetRepMgrAckPolicy(testHome + "_ALL_PEERS",
AckPolicy.ALL_PEERS);
SetRepMgrAckPolicy(testHome + "_NONE",
AckPolicy.NONE);
SetRepMgrAckPolicy(testHome + "_ONE",
AckPolicy.ONE);
SetRepMgrAckPolicy(testHome + "_ONE_PEER",
AckPolicy.ONE_PEER);
SetRepMgrAckPolicy(testHome + "_QUORUM",
AckPolicy.QUORUM);
SetRepMgrAckPolicy(testHome + "_NULL", null);
}
public void SetRepMgrAckPolicy(string home, AckPolicy policy)
{
Configuration.ClearDir(home);
DatabaseEnvironmentConfig envConfig =
new DatabaseEnvironmentConfig();
envConfig.Create = true;
envConfig.UseLocking = true;
envConfig.UseLogging = true;
envConfig.UseMPool = true;
envConfig.UseReplication = true;
envConfig.UseTxns = true;
DatabaseEnvironment env = DatabaseEnvironment.Open(
home, envConfig);
if (policy != null)
{
env.RepMgrAckPolicy = policy;
Assert.AreEqual(policy, env.RepMgrAckPolicy);
}
env.Close();
}
[Test]
public void TestLocalSite() {
testName = "TestLocalSite";
SetUpTest(true);
Configuration.ClearDir(testHome);
DatabaseEnvironmentConfig envConfig =
new DatabaseEnvironmentConfig();
envConfig.Create = true;
envConfig.UseLocking = true;
envConfig.UseLogging = true;
envConfig.UseMPool = true;
envConfig.UseReplication = true;
envConfig.UseTxns = true;
ReplicationHostAddress addr =
new ReplicationHostAddress("localhost:6060");
ReplicationConfig repCfg = new ReplicationConfig();
DbSiteConfig dbSiteConfig = new DbSiteConfig();
dbSiteConfig.Host = addr.Host;
dbSiteConfig.Port = addr.Port;
dbSiteConfig.LocalSite = true;
repCfg.RepmgrSitesConfig.Add(dbSiteConfig);
envConfig.RepSystemCfg = repCfg;
DatabaseEnvironment env =
DatabaseEnvironment.Open(testHome, envConfig);
ReplicationHostAddress testAddr =
env.RepMgrLocalSite.Address;
Assert.AreEqual(addr.Host, testAddr.Host);
Assert.AreEqual(addr.Port, testAddr.Port);
env.Close();
}
public class AvailablePorts : IEnumerable<uint>
{
private List<uint> availablePort = new List<uint>();
private List<uint> usingPort = new List<uint>();
private int position = -1;
public AvailablePorts()
{
// Initial usingPort array with all TCP ports being used.
IPGlobalProperties properties = IPGlobalProperties.GetIPGlobalProperties();
foreach (IPEndPoint point in properties.GetActiveTcpListeners())
usingPort.Add((uint)point.Port);
// Initial availablePort array with available ports ranging from 10000 to 15000.
for (uint i = 10000; i <= 15000; i++)
{
if (!usingPort.Contains(i))
availablePort.Add(i);
}
}
IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }
public IEnumerator<uint> GetEnumerator()
{
while (MoveNext())
yield return availablePort[position];
}
public bool MoveNext()
{
position++;
return position < availablePort.Count;
}
public uint Current
{
get {
if (position == -1)
position = 0;
return availablePort[position];
}
}
}
[Test]
public void TestSiteRepConfig()
{
testName = "TestSiteRepConfig";
SetUpTest(true);
Configuration.ClearDir(testHome);
DatabaseEnvironmentConfig envConfig =
new DatabaseEnvironmentConfig();
envConfig.Create = true;
envConfig.UseLocking = true;
envConfig.UseLogging = true;
envConfig.UseMPool = true;
envConfig.UseReplication = true;
envConfig.UseTxns = true;
ReplicationHostAddress addr =
new ReplicationHostAddress("localhost:6060");
ReplicationConfig repCfg = new ReplicationConfig();
DbSiteConfig dbSiteConfig = new DbSiteConfig();
dbSiteConfig.Host = addr.Host;
dbSiteConfig.Port = addr.Port;
dbSiteConfig.LocalSite = true;
repCfg.RepmgrSitesConfig.Add(dbSiteConfig);
// DatabaseEnvironment.RepInMemory defaults to false.
envConfig.RepSystemCfg = repCfg;
DatabaseEnvironment env =
DatabaseEnvironment.Open(testHome, envConfig);
Assert.AreEqual(false, env.RepInMemory);
env.Close();
Assert.Less(0,
Directory.GetFiles(testHome, "__db.rep.*").Length);
// Set RepInMemory as true.
Configuration.ClearDir(testHome);
repCfg.InMemory = true;
envConfig.RepSystemCfg = repCfg;
env = DatabaseEnvironment.Open(testHome, envConfig);
Assert.AreEqual(true, env.RepInMemory);
env.Close();
Assert.AreEqual(0,
Directory.GetFiles(testHome, "__db.rep.*").Length);
// Set RepInMemory as false.
Configuration.ClearDir(testHome);
repCfg.InMemory = false;
envConfig.RepSystemCfg = repCfg;
env = DatabaseEnvironment.Open(testHome, envConfig);
Assert.AreEqual(false, env.RepInMemory);
env.Close();
Assert.Less(0,
Directory.GetFiles(testHome, "__db.rep.*").Length);
}
}
}