summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2010-11-05 22:04:08 +0000
committerStephen D. Huston <shuston@apache.org>2010-11-05 22:04:08 +0000
commite330f0d3937bd7dbf689ae55e5cc490cf186c95f (patch)
tree275b479b2701ddcc47cf367dcf066f2d4c31aad3
parent9db7edfd06c10a59f335aacee32e9d95d5da9a77 (diff)
downloadqpid-python-e330f0d3937bd7dbf689ae55e5cc490cf186c95f.tar.gz
Manage CLFS containers using policies to both create the initial containers and to automatically grow and shrink the log as needed.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1031842 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Log.cpp67
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/Log.h1
-rw-r--r--qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp1
3 files changed, 46 insertions, 23 deletions
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp b/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
index 7e669e281e..e6cb10c133 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Log.cpp
@@ -21,6 +21,7 @@
#include <windows.h>
#include <clfsw32.h>
+#include <clfsmgmtw32.h>
#include <sstream>
#include <string>
#include <vector>
@@ -47,11 +48,12 @@ Log::open(const std::string& path, const TuningParameters& params)
logPath = path;
std::string logSpec = "log:" + path;
size_t specLength = logSpec.length();
- wchar_t *wLogSpec = new wchar_t[specLength + 1];
+ std::auto_ptr<wchar_t> wLogSpec(new wchar_t[specLength + 1]);
size_t converted;
- mbstowcs_s(&converted, wLogSpec, specLength+1, logSpec.c_str(), specLength);
- wLogSpec[converted] = L'\0';
- handle = ::CreateLogFile(wLogSpec,
+ mbstowcs_s(&converted,
+ wLogSpec.get(), specLength+1,
+ logSpec.c_str(), specLength);
+ handle = ::CreateLogFile(wLogSpec.get(),
GENERIC_WRITE | GENERIC_READ,
0,
0,
@@ -62,31 +64,45 @@ Log::open(const std::string& path, const TuningParameters& params)
ULONG infoSize = sizeof(info);
BOOL ok = ::GetLogFileInformation(handle, &info, &infoSize);
QPID_WINDOWS_CHECK_NOT(ok, 0);
+ ok = ::RegisterManageableLogClient(handle, 0);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ // Set up policies for how many containers to initially create and how
+ // large each container should be. Also, auto-grow the log when container
+ // space runs out.
+ CLFS_MGMT_POLICY logPolicy;
+ logPolicy.Version = CLFS_MGMT_POLICY_VERSION;
+ logPolicy.LengthInBytes = sizeof(logPolicy);
+ logPolicy.PolicyFlags = 0;
+
// If this is the first time this log is opened, give an opportunity to
// initialize its content.
bool needInitialize(false);
if (info.TotalContainers == 0) {
- std::vector<const std::wstring> paths;
- LPWSTR cPaths[1024];
- size_t pathLength = logPath.length();
- wchar_t *wLogPath = new wchar_t[pathLength + 1];
- mbstowcs_s(&converted, wLogPath, pathLength+1,
- logPath.c_str(), pathLength);
- wLogPath[converted] = L'\0';
- for (unsigned short i = 0; i < params.containers && i < 1024; ++i) {
- std::wostringstream path;
- path << wLogPath << L"-container-" << i << std::ends;
- paths.push_back(path.str ());
- cPaths[i] = const_cast<LPWSTR>(paths[i].c_str());
- }
- ok = ::AddLogContainerSet(handle,
- params.containers,
- &this->containerSize,
- cPaths,
- NULL);
+ // New log; set the configured container size and create the
+ // initial set of containers.
+ logPolicy.PolicyType = ClfsMgmtPolicyNewContainerSize;
+ logPolicy.PolicyParameters.NewContainerSize.SizeInBytes = containerSize;
+ ok = ::InstallLogPolicy(handle, &logPolicy);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
+ ULONGLONG desired(params.containers), actual(0);
+ ok = ::SetLogFileSizeWithPolicy(handle, &desired, &actual);
QPID_WINDOWS_CHECK_NOT(ok, 0);
+
needInitialize = true;
}
+ // Ensure that the log is extended as needed and will shrink when 50%
+ // becomes unused.
+ logPolicy.PolicyType = ClfsMgmtPolicyAutoGrow;
+ logPolicy.PolicyParameters.AutoGrow.Enabled = 1;
+ ok = ::InstallLogPolicy(handle, &logPolicy);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+ logPolicy.PolicyType = ClfsMgmtPolicyAutoShrink;
+ logPolicy.PolicyParameters.AutoShrink.Percentage = params.shrinkPct;
+ ok = ::InstallLogPolicy(handle, &logPolicy);
+ QPID_WINDOWS_CHECK_NOT(ok, 0);
+
// Need a marshaling area
ok = ::CreateLogMarshallingArea(handle,
NULL, NULL, NULL, // Alloc, free, context
@@ -131,7 +147,7 @@ Log::write(void* entry, uint32_t length, CLFS_LSN* prev)
&desc, 1, // Buffer descriptor
0, prev, // Undo-Next, Prev
0, 0, // Reservation
- CLFS_FLAG_FORCE_FLUSH, // CLFS_FLAGS_NO_FLAGS
+ CLFS_FLAG_FORCE_FLUSH,
&lsn,
0);
QPID_WINDOWS_CHECK_NOT(ok, 0);
@@ -155,6 +171,11 @@ Log::moveTail(const CLFS_LSN& oldest)
BOOL ok = ::AdvanceLogBase(marshal,
const_cast<PCLFS_LSN>(&oldest),
0, NULL);
+ // If multiple threads are manipulating things they may get out of
+ // order when moving the tail; if someone already moved it further
+ // than this, it's ok - ignore it.
+ if (ok || ::GetLastError() == ERROR_LOG_START_OF_LOG)
+ return;
QPID_WINDOWS_CHECK_NOT(ok, 0);
}
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/Log.h b/qpid/cpp/src/qpid/store/ms-clfs/Log.h
index dda41a3bac..2f7eb6cada 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/Log.h
+++ b/qpid/cpp/src/qpid/store/ms-clfs/Log.h
@@ -53,6 +53,7 @@ public:
struct TuningParameters {
size_t containerSize;
unsigned short containers;
+ unsigned short shrinkPct;
uint32_t maxWriteBuffers;
};
diff --git a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
index a4d1efe008..586aaaf980 100644
--- a/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
+++ b/qpid/cpp/src/qpid/store/ms-clfs/MSSqlClfsProvider.cpp
@@ -400,6 +400,7 @@ MSSqlClfsProvider::earlyInitialize(Plugin::Target &target)
Log::TuningParameters params;
params.containerSize = options.containerSize;
params.containers = options.initialContainers;
+ params.shrinkPct = 50;
params.maxWriteBuffers = options.maxWriteBuffers;
std::string msgPath = options.storeDir + "\\" + "messages";
messages.openLog(msgPath, params);