public class JournalStorageManager extends Object implements StorageManager
Using this class also ensures that locks are acquired in the right order, avoiding dead-locks.
Notice that, turning on and off replication (on the live server side) is _mostly_ a matter of
using ReplicatedJournals instead of regular JournalImpl, and sync the existing
data. For details see the Javadoc of
startReplication(ReplicationManager, PagingManager, String, boolean).
StorageManager.LargeMessageExtension| Constructor and Description |
|---|
JournalStorageManager(Configuration config,
ExecutorFactory executorFactory) |
JournalStorageManager(Configuration config,
ExecutorFactory executorFactory,
IOCriticalErrorListener criticalErrorListener) |
| Modifier and Type | Method and Description |
|---|---|
void |
addBytesToLargeMessage(SequentialFile file,
long messageId,
byte[] bytes) |
void |
addGrouping(GroupBinding groupBinding) |
void |
addQueueBinding(long tx,
Binding binding) |
boolean |
addToPage(PagingStore store,
ServerMessage msg,
Transaction tx,
RouteContextList listCtx)
Write message to page if we are paging.
|
void |
afterCompleteOperations(IOAsyncTask run) |
void |
afterPageRead()
We need a safeguard in place to avoid too much concurrent IO happening on Paging, otherwise
the system may become unresponsive if too many destinations are reading all the same time.
|
ByteBuffer |
allocateDirectBuffer(int size)
AIO has an optimized buffer which has a method to release it
instead of the way NIO will release data based on GC.
|
void |
beforePageRead()
We need a safeguard in place to avoid too much concurrent IO happening on Paging, otherwise
the system may become unresponsive if too many destinations are reading all the same time.
|
void |
clearContext() |
void |
commit(long txID) |
void |
commit(long txID,
boolean lineUpContext) |
void |
commitBindings(long txID) |
void |
confirmPendingLargeMessage(long recordID)
We don't need messageID now but we are likely to need it we ever decide to support a database
|
void |
confirmPendingLargeMessageTX(Transaction tx,
long messageID,
long recordID)
Confirms that a large message was finished
|
SequentialFile |
createFileForLargeMessage(long messageID,
StorageManager.LargeMessageExtension extension)
Instantiates a SequentialFile to be used for storing a
LargeServerMessage. |
LargeServerMessage |
createLargeMessage() |
LargeServerMessage |
createLargeMessage(long id,
MessageInternal message)
Creates a new LargeMessage with the given id.
|
void |
deleteAddressSetting(SimpleString addressMatch) |
void |
deleteCursorAcknowledge(long ackID) |
void |
deleteCursorAcknowledgeTransactional(long txID,
long ackID) |
void |
deleteDuplicateID(long recordID) |
void |
deleteDuplicateIDTransactional(long txID,
long recordID) |
void |
deleteGrouping(GroupBinding groupBinding) |
void |
deleteHeuristicCompletion(long id) |
void |
deleteIncrementRecord(long txID,
long recordID) |
void |
deleteMessage(long messageID) |
void |
deletePageComplete(long ackID) |
void |
deletePageCounter(long txID,
long recordID) |
void |
deletePageTransactional(long recordID) |
void |
deleteQueueBinding(long tx,
long queueBindingID) |
void |
deleteSecurityRoles(SimpleString addressMatch) |
void |
freeDirectBuffer(ByteBuffer buffer)
AIO has an optimized buffer which has a method to release it
instead of the way NIO will release data based on GC.
|
long |
generateUniqueID() |
Journal |
getBindingsJournal() |
OperationContext |
getContext()
Get the context associated with the thread for later reuse
|
long |
getCurrentUniqueID() |
Journal |
getMessageJournal() |
Executor |
getSingleThreadExecutor() |
boolean |
isReplicated() |
boolean |
isStarted() |
void |
lineUpContext() |
JournalLoadInformation |
loadBindingJournal(List<QueueBindingInfo> queueBindingInfos,
List<GroupingInfo> groupingInfos) |
JournalLoadInformation[] |
loadInternalOnly()
TODO: Is this still being used ?
|
JournalLoadInformation |
loadMessageJournal(PostOffice postOffice,
PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long,Queue> queues,
Map<Long,QueueBindingInfo> queueInfos,
Map<SimpleString,List<Pair<byte[],Long>>> duplicateIDMap,
Set<Pair<Long,Long>> pendingLargeMessages) |
static String |
md5(File file) |
protected static JournalStorageManager.PersistentQueueBindingEncoding |
newBindingEncoding(long id,
HornetQBuffer buffer) |
OperationContext |
newContext(Executor executor1)
It just creates an OperationContext without associating it
|
protected static PersistedRoles |
newSecurityRecord(long id,
HornetQBuffer buffer) |
OperationContext |
newSingleThreadContext() |
void |
pageClosed(SimpleString storeName,
int pageNumber) |
void |
pageDeleted(SimpleString storeName,
int pageNumber) |
void |
pageWrite(PagedMessage message,
int pageNumber) |
void |
persistIdGenerator()
Closes the
IDGenerator persisting the current record ID. |
void |
prepare(long txID,
Xid xid) |
void |
readLock()
Read lock the StorageManager.
|
void |
readUnLock()
Unlock the manager.
|
List<PersistedAddressSetting> |
recoverAddressSettings() |
List<PersistedRoles> |
recoverPersistedRoles() |
void |
rollback(long txID) |
void |
rollbackBindings(long txID) |
void |
setContext(OperationContext context)
Set the context back to the thread
|
void |
start() |
void |
startReplication(ReplicationManager replicationManager,
PagingManager pagingManager,
String nodeID,
boolean autoFailBack)
Starts replication at the live-server side.
|
void |
stop() |
void |
stop(boolean ioCriticalError) |
void |
stopReplication()
Stops replication by resetting replication-related fields to their 'unreplicated' state.
|
void |
storeAcknowledge(long queueID,
long messageID) |
void |
storeAcknowledgeTransactional(long txID,
long queueID,
long messageID) |
void |
storeAddressSetting(PersistedAddressSetting addressSetting) |
void |
storeCursorAcknowledge(long queueID,
PagePosition position) |
void |
storeCursorAcknowledgeTransactional(long txID,
long queueID,
PagePosition position) |
void |
storeDuplicateID(SimpleString address,
byte[] duplID,
long recordID) |
void |
storeDuplicateIDTransactional(long txID,
SimpleString address,
byte[] duplID,
long recordID) |
long |
storeHeuristicCompletion(Xid xid,
boolean isCommit) |
void |
storeID(long journalID,
long id)
Stores the given journalID in the bindingsJournal.
|
void |
storeMessage(ServerMessage message) |
void |
storeMessageTransactional(long txID,
ServerMessage message) |
void |
storePageCompleteTransactional(long txID,
long queueID,
PagePosition position) |
long |
storePageCounter(long txID,
long queueID,
long value) |
long |
storePageCounterInc(long queueID,
int value) |
long |
storePageCounterInc(long txID,
long queueID,
int value) |
void |
storePageTransaction(long txID,
PageTransactionInfo pageTransaction) |
long |
storePendingLargeMessage(long messageID) |
void |
storeReference(long queueID,
long messageID,
boolean last) |
void |
storeReferenceTransactional(long txID,
long queueID,
long messageID) |
void |
storeSecurityRoles(PersistedRoles persistedRoles) |
void |
updateDeliveryCount(MessageReference ref) |
void |
updateDuplicateIDTransactional(long txID,
SimpleString address,
byte[] duplID,
long recordID) |
void |
updatePageTransaction(long txID,
PageTransactionInfo pageTransaction,
int depages) |
void |
updatePageTransaction(PageTransactionInfo pageTransaction,
int depages)
FIXME Unused
|
void |
updateScheduledDeliveryTime(MessageReference ref) |
void |
updateScheduledDeliveryTimeTransactional(long txID,
MessageReference ref) |
void |
waitOnOperations()
Block until the operations are done.
|
boolean |
waitOnOperations(long timeout)
Block until the operations are done.
|
public JournalStorageManager(Configuration config, ExecutorFactory executorFactory)
public JournalStorageManager(Configuration config, ExecutorFactory executorFactory, IOCriticalErrorListener criticalErrorListener)
public void clearContext()
clearContext in interface StorageManagerpublic boolean isReplicated()
public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager, String nodeID, boolean autoFailBack) throws Exception
In practice that means 2 things:
(1) all currently existing data must be sent to the backup.
(2) every new persistent information is replicated (sent) to the backup.
To achieve (1), we lock the entire journal while collecting the list of files to send to the backup. The journal does not remain locked during actual synchronization.
To achieve (2), instead of writing directly to instances of JournalImpl, we write to
instances of ReplicatedJournal.
At the backup-side replication is handled by ReplicationEndpoint.
startReplication in interface StorageManagerreplicationManager - pagingManager - HornetQExceptionExceptionstartReplication(ReplicationManager, PagingManager, String,
boolean)public void stopReplication()
stopReplication in interface StorageManagerpublic final void waitOnOperations()
throws Exception
StorageManagerwaitOnOperations in interface StorageManagerExceptionpublic final boolean waitOnOperations(long timeout)
throws Exception
StorageManagerwaitOnOperations in interface StorageManagerExceptionpublic void pageClosed(SimpleString storeName, int pageNumber)
pageClosed in interface StorageManagerpublic void pageDeleted(SimpleString storeName, int pageNumber)
pageDeleted in interface StorageManagerpublic void pageWrite(PagedMessage message, int pageNumber)
pageWrite in interface StorageManagerpublic OperationContext getContext()
StorageManagergetContext in interface StorageManagerpublic void setContext(OperationContext context)
StorageManagersetContext in interface StorageManagerpublic Executor getSingleThreadExecutor()
public OperationContext newSingleThreadContext()
newSingleThreadContext in interface StorageManagerpublic OperationContext newContext(Executor executor1)
StorageManagernewContext in interface StorageManagerpublic void afterCompleteOperations(IOAsyncTask run)
afterCompleteOperations in interface StorageManagerpublic long generateUniqueID()
generateUniqueID in interface StorageManagerpublic long getCurrentUniqueID()
getCurrentUniqueID in interface StorageManagerpublic LargeServerMessage createLargeMessage()
createLargeMessage in interface StorageManagerpublic final void addBytesToLargeMessage(SequentialFile file, long messageId, byte[] bytes) throws Exception
addBytesToLargeMessage in interface StorageManagerExceptionpublic LargeServerMessage createLargeMessage(long id, MessageInternal message) throws Exception
StorageManagercreateLargeMessage in interface StorageManagermessage - This is a temporary message that holds the parsed properties. The remoting
layer can't create a ServerMessage directly, then this will be replaced.Exceptionpublic long storePendingLargeMessage(long messageID)
throws Exception
Exceptionpublic void confirmPendingLargeMessageTX(Transaction tx, long messageID, long recordID) throws Exception
StorageManagerconfirmPendingLargeMessageTX in interface StorageManagerExceptionpublic void confirmPendingLargeMessage(long recordID)
throws Exception
confirmPendingLargeMessage in interface StorageManagerExceptionpublic void storeMessage(ServerMessage message) throws Exception
storeMessage in interface StorageManagerExceptionpublic void storeReference(long queueID,
long messageID,
boolean last)
throws Exception
storeReference in interface StorageManagerExceptionpublic void readLock()
StorageManagerThe main lock is used to write lock the whole manager when starting replication. Sub-systems, say Paging classes, that use locks of their own AND also write through the StorageManager MUST first read lock the storageManager before taking their own locks. Otherwise, we may dead-lock when starting replication sync.
readLock in interface StorageManagerpublic void readUnLock()
StorageManagerreadUnLock in interface StorageManagerStorageManager.readLock()public void storeAcknowledge(long queueID,
long messageID)
throws Exception
storeAcknowledge in interface StorageManagerExceptionpublic void storeCursorAcknowledge(long queueID,
PagePosition position)
throws Exception
storeCursorAcknowledge in interface StorageManagerExceptionpublic void deleteMessage(long messageID)
throws Exception
deleteMessage in interface StorageManagerExceptionpublic void updateScheduledDeliveryTime(MessageReference ref) throws Exception
updateScheduledDeliveryTime in interface StorageManagerExceptionpublic void storeDuplicateID(SimpleString address, byte[] duplID, long recordID) throws Exception
storeDuplicateID in interface StorageManagerExceptionpublic void deleteDuplicateID(long recordID)
throws Exception
deleteDuplicateID in interface StorageManagerExceptionpublic void storeMessageTransactional(long txID,
ServerMessage message)
throws Exception
storeMessageTransactional in interface StorageManagerExceptionpublic void storePageTransaction(long txID,
PageTransactionInfo pageTransaction)
throws Exception
storePageTransaction in interface StorageManagerExceptionpublic void updatePageTransaction(long txID,
PageTransactionInfo pageTransaction,
int depages)
throws Exception
updatePageTransaction in interface StorageManagerExceptionpublic void updatePageTransaction(PageTransactionInfo pageTransaction, int depages) throws Exception
StorageManagerupdatePageTransaction in interface StorageManagerExceptionpublic void storeReferenceTransactional(long txID,
long queueID,
long messageID)
throws Exception
storeReferenceTransactional in interface StorageManagerExceptionpublic void storeAcknowledgeTransactional(long txID,
long queueID,
long messageID)
throws Exception
storeAcknowledgeTransactional in interface StorageManagerExceptionpublic void storeCursorAcknowledgeTransactional(long txID,
long queueID,
PagePosition position)
throws Exception
storeCursorAcknowledgeTransactional in interface StorageManagerExceptionpublic void storePageCompleteTransactional(long txID,
long queueID,
PagePosition position)
throws Exception
storePageCompleteTransactional in interface StorageManagerExceptionpublic void deletePageComplete(long ackID)
throws Exception
deletePageComplete in interface StorageManagerExceptionpublic void deleteCursorAcknowledgeTransactional(long txID,
long ackID)
throws Exception
deleteCursorAcknowledgeTransactional in interface StorageManagerExceptionpublic void deleteCursorAcknowledge(long ackID)
throws Exception
deleteCursorAcknowledge in interface StorageManagerExceptionpublic long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
storeHeuristicCompletion in interface StorageManagerExceptionpublic void deleteHeuristicCompletion(long id)
throws Exception
deleteHeuristicCompletion in interface StorageManagerExceptionpublic void deletePageTransactional(long recordID)
throws Exception
deletePageTransactional in interface StorageManagerExceptionpublic void updateScheduledDeliveryTimeTransactional(long txID,
MessageReference ref)
throws Exception
updateScheduledDeliveryTimeTransactional in interface StorageManagerExceptionpublic void prepare(long txID,
Xid xid)
throws Exception
prepare in interface StorageManagerExceptionpublic void commit(long txID)
throws Exception
commit in interface StorageManagerExceptionpublic void commitBindings(long txID)
throws Exception
commitBindings in interface StorageManagerExceptionpublic void rollbackBindings(long txID)
throws Exception
rollbackBindings in interface StorageManagerExceptionpublic void commit(long txID,
boolean lineUpContext)
throws Exception
commit in interface StorageManagerExceptionpublic void rollback(long txID)
throws Exception
rollback in interface StorageManagerExceptionpublic void storeDuplicateIDTransactional(long txID,
SimpleString address,
byte[] duplID,
long recordID)
throws Exception
storeDuplicateIDTransactional in interface StorageManagerExceptionpublic void updateDuplicateIDTransactional(long txID,
SimpleString address,
byte[] duplID,
long recordID)
throws Exception
updateDuplicateIDTransactional in interface StorageManagerExceptionpublic void deleteDuplicateIDTransactional(long txID,
long recordID)
throws Exception
deleteDuplicateIDTransactional in interface StorageManagerExceptionpublic void updateDeliveryCount(MessageReference ref) throws Exception
updateDeliveryCount in interface StorageManagerExceptionpublic void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
storeAddressSetting in interface StorageManagerExceptionpublic List<PersistedAddressSetting> recoverAddressSettings() throws Exception
recoverAddressSettings in interface StorageManagerExceptionpublic List<PersistedRoles> recoverPersistedRoles() throws Exception
recoverPersistedRoles in interface StorageManagerExceptionpublic void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception
storeSecurityRoles in interface StorageManagerExceptionpublic final void storeID(long journalID,
long id)
throws Exception
StorageManagerstoreID in interface StorageManagerExceptionpublic void deleteAddressSetting(SimpleString addressMatch) throws Exception
deleteAddressSetting in interface StorageManagerExceptionpublic void deleteSecurityRoles(SimpleString addressMatch) throws Exception
deleteSecurityRoles in interface StorageManagerExceptionpublic JournalLoadInformation loadMessageJournal(PostOffice postOffice, PagingManager pagingManager, ResourceManager resourceManager, Map<Long,Queue> queues, Map<Long,QueueBindingInfo> queueInfos, Map<SimpleString,List<Pair<byte[],Long>>> duplicateIDMap, Set<Pair<Long,Long>> pendingLargeMessages) throws Exception
loadMessageJournal in interface StorageManagerExceptionpublic void addGrouping(GroupBinding groupBinding) throws Exception
addGrouping in interface StorageManagerExceptionpublic void deleteGrouping(GroupBinding groupBinding) throws Exception
deleteGrouping in interface StorageManagerExceptionpublic void addQueueBinding(long tx,
Binding binding)
throws Exception
addQueueBinding in interface StorageManagerExceptionpublic void deleteQueueBinding(long tx,
long queueBindingID)
throws Exception
deleteQueueBinding in interface StorageManagerExceptionpublic long storePageCounterInc(long txID,
long queueID,
int value)
throws Exception
storePageCounterInc in interface StorageManagerExceptionpublic long storePageCounterInc(long queueID,
int value)
throws Exception
storePageCounterInc in interface StorageManagerExceptionpublic long storePageCounter(long txID,
long queueID,
long value)
throws Exception
storePageCounter in interface StorageManagerExceptionpublic void deleteIncrementRecord(long txID,
long recordID)
throws Exception
deleteIncrementRecord in interface StorageManagerExceptionpublic void deletePageCounter(long txID,
long recordID)
throws Exception
deletePageCounter in interface StorageManagerExceptionpublic JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
loadBindingJournal in interface StorageManagerExceptionpublic void lineUpContext()
lineUpContext in interface StorageManagerpublic void start()
throws Exception
start in interface HornetQComponentExceptionpublic void stop()
throws Exception
stop in interface HornetQComponentExceptionpublic void persistIdGenerator()
StorageManagerIDGenerator persisting the current record ID.
Effectively a "pre-stop" method. Necessary due to the "stop"-order at
HornetQServerImpl
persistIdGenerator in interface StorageManagerpublic void stop(boolean ioCriticalError)
throws Exception
stop in interface StorageManagerioCriticalError - is the server being stopped due to an IO critical errorExceptionpublic boolean isStarted()
isStarted in interface HornetQComponentpublic JournalLoadInformation[] loadInternalOnly() throws Exception
Exceptionpublic void beforePageRead()
throws Exception
StorageManagerbeforePageRead in interface StorageManagerExceptionpublic void afterPageRead()
throws Exception
StorageManagerafterPageRead in interface StorageManagerExceptionpublic ByteBuffer allocateDirectBuffer(int size)
StorageManagerallocateDirectBuffer in interface StorageManagerpublic void freeDirectBuffer(ByteBuffer buffer)
StorageManagerfreeDirectBuffer in interface StorageManagerpublic Journal getMessageJournal()
getMessageJournal in interface StorageManagerpublic Journal getBindingsJournal()
getBindingsJournal in interface StorageManagerpublic SequentialFile createFileForLargeMessage(long messageID, StorageManager.LargeMessageExtension extension)
StorageManagerLargeServerMessage.createFileForLargeMessage in interface StorageManagermessageID - the id of the messageextension - the extension to add to the fileprotected static PersistedRoles newSecurityRecord(long id, HornetQBuffer buffer)
id - buffer - protected static JournalStorageManager.PersistentQueueBindingEncoding newBindingEncoding(long id, HornetQBuffer buffer)
id - buffer - public boolean addToPage(PagingStore store, ServerMessage msg, Transaction tx, RouteContextList listCtx) throws Exception
StorageManager
This is primarily a PagingStore call, but as with any other call writing persistent
data, it must go through here. Both for the sake of replication, and also to ensure that it
takes the locks (storage manager and pagingStore) in the right order. Avoiding thus the
creation of dead-locks.
addToPage in interface StorageManagertrue if we are paging and have handled the data, false if the data
needs to be sent to the journalExceptionCopyright © 2013 JBoss, a division of Red Hat. All rights reserved.