An Apache Geode AsyncEventQueue is used to asynchronously process events after they have been applied to a Region. They are normally used to replay Region events into a relational database or other remote data store. Other use cases want to take advantage of asynchronously processing events in parallel without actually storing entries in a Region. In these cases, each event just needs to be routed directly to the AsyncEventQueue. This behavior is effectively possible with serial AsyncEventQueues and replicated Regions. All servers can define a Region as a REPLICATE_PROXY and operations are allowed on that Region. Events go through the Region without being applied to it and are delivered to the serial AsyncEventQueue. The same cannot be done with parallel AsyncEventQueues and partitioned Regions. If all servers define a Region as PARTITION_PROXY, an operation on that Region will fail with a PartitionedRegionStorageException.
This article shows how to route events directly to a parallel AsyncEventQueue using Functions.
Normally, events get delivered to AsyncEventsQueues as a result of Region operations like create, update and destroy. Here is a simplified diagram of that architecture:
An alternate architecture bypasses the Region operation and replication in the diagram above and instead uses Function invocations to route the data between the primary and redundant servers. Here is a simplified diagram of that architecture:
All of the code for this example is here.
The implementation consists mainly of a PrimaryRoutingFunction and a SecondaryRoutingFunction.
The PrimaryRoutingFunction:
The SecondaryRoutingFunction:
The BaseFunction provides methods common to both functions.
The RoutingAsyncEventListener is an AsyncEventListener that processes AsyncEvents by logging and counting them.
An EntryEventImpl is created by both functions. It represents the Region operation and is delivered to the AsyncEventQueue.
The createEvent method:
protected EntryEventImpl createEvent(PartitionedRegion pr, Object key, Object value,
EventID eventId, long tailKey) {
// Create the EntryEventImpl
EntryEventImpl event = this.entryEventFactory
.create(pr, Operation.CREATE, key, value, null, true, pr.getCache().getMyId(), false, eventId);
// Set the event's tail key. If the input tailKey == -1, then this is the primary.
// The tailKey is set directly in the event in the secondary.
if (tailKey != -1) {
event.setTailKey(tailKey);
}
// The tailKey is set by handleWANEvent in the event in the primary.
// handleWANEvent also updates the BucketRegion's most recent tail key.
pr.getBucketRegion(key).handleWANEvent(event);
return event;
}
The PrimaryRoutingFunction invokes the SecondaryRoutingFunction on any redundant DistributedMembers. This invocation replaces the Region event replication in the normal usage. The routeToSecondaryMembers method:
private void routeToSecondaryMembers(PartitionedRegion pr, EntryEventImpl event) {
// Get the redundant members for this key
Set<DistributedMember> redundantMembers = PartitionRegionHelper
.getRedundantMembersForKey(pr, event.getKey());
// Create and execute the SecondaryRoutingFunction on those members if necessary
if (!redundantMembers.isEmpty()) {
Object[] args = new Object[] {
pr.getFullPath(), event.getKey(), event.getValue(), event.getEventId(), event.getTailKey()
};
try {
FunctionService.onMembers(redundantMembers)
.setArguments(args)
.execute("SecondaryRoutingFunction")
.getResult();
} catch (FunctionException e) {
// If the remote member is killed, a FunctionException is thrown. Log a warning and continue.
pr.getCache().getLogger()
.warning("PrimaryRoutingFunction caught exception executing SecondaryRoutingFunction for key="
+ event.getKey() + "; value=" + event.getNewValue(), e);
}
}
}
The EntryEventImpl is delivered to the AsyncEventQueue by both functions. The deliverToAsyncEventQueue method:
protected void deliverToAsyncEventQueue(EntryEventImpl event) {
// Get the AsyncEventQueue
String queueId = (String) event.getRegion().getAsyncEventQueueIds().iterator().next();
AsyncEventQueueImpl queue = (AsyncEventQueueImpl) event.getRegion()
.getCache().getAsyncEventQueue(queueId);
// Get the GatewaySender
AbstractGatewaySender sender = (AbstractGatewaySender) queue.getSender();
// Distribute the EntryEvent to the GatewaySender
sender.distribute(EnumListenerEvent.AFTER_CREATE, event, REMOTE_DS_IDS);
}
In normal usage, the PrimaryRoutingFunction and RoutingAsyncEventListener in the primary server will log messages like:
[info 2020/08/23 13:12:13.750 HST <ServerConnection on port 51965 Thread 1> tid=0x5b] PrimaryRoutingFunction processing args= [PDX[14273398,example.client.domain.Trade]{id=2}, EventID[id=31 bytes;threadID=1;sequenceID=2]]
[info 2020/08/23 13:12:13.762 HST <ServerConnection on port 51965 Thread 1> tid=0x5b] PrimaryRoutingFunction processing args= [PDX[14273398,example.client.domain.Trade]{id=3}, EventID[id=31 bytes;threadID=1;sequenceID=3]]
[info 2020/08/23 13:12:13.789 HST <ServerConnection on port 51965 Thread 1> tid=0x5b] PrimaryRoutingFunction processing args= [PDX[14273398,example.client.domain.Trade]{id=8}, EventID[id=31 bytes;threadID=1;sequenceID=8]]
[info 2020/08/23 13:12:13.797 HST <ServerConnection on port 51965 Thread 1> tid=0x5b] PrimaryRoutingFunction processing args= [PDX[14273398,example.client.domain.Trade]{id=9}, EventID[id=31 bytes;threadID=1;sequenceID=9]]
[info 2020/08/23 13:12:14.412 HST <Event Processor for GatewaySender_AsyncEventQueue_routing_aeq_0> tid=0x47] RoutingAsyncEventListener: Processing 4 events (total=4; possibleDuplicate=0)
key=2; value=PDX[14273398,example.client.domain.Trade]{id=2}; possibleDuplicate=false
key=3; value=PDX[14273398,example.client.domain.Trade]{id=3}; possibleDuplicate=false
key=8; value=PDX[14273398,example.client.domain.Trade]{id=8}; possibleDuplicate=false
key=9; value=PDX[14273398,example.client.domain.Trade]{id=9}; possibleDuplicate=false
The SecondaryRoutingFunction in the secondary servers will log messages like:
[info 2020/08/23 13:12:13.756 HST <Function Execution Processor2> tid=0x3d] SecondaryRoutingFunction processing args= [/Router, 2, PDX[14273398,example.client.domain.Trade]{id=2}, EventID[id=31 bytes;threadID=1;sequenceID=2], 163]
[info 2020/08/23 13:12:13.763 HST <Function Execution Processor2> tid=0x3d] SecondaryRoutingFunction processing args= [/Router, 3, PDX[14273398,example.client.domain.Trade]{id=3}, EventID[id=31 bytes;threadID=1;sequenceID=3], 164]
[info 2020/08/23 13:12:13.790 HST <Function Execution Processor2> tid=0x3d] SecondaryRoutingFunction processing args= [/Router, 8, PDX[14273398,example.client.domain.Trade]{id=8}, EventID[id=31 bytes;threadID=1;sequenceID=8], 169]
[info 2020/08/23 13:12:13.799 HST <Function Execution Processor2> tid=0x39] SecondaryRoutingFunction processing args= [/Router, 9, PDX[14273398,example.client.domain.Trade]{id=9}, EventID[id=31 bytes;threadID=1;sequenceID=9], 170]
If a server is killed while routing events, the server logs will contain messages like below.
In this example, the last log messages from the PrimaryRoutingFunction in the killed server were:
[info 2020/08/23 14:17:05.681 HST <ServerConnection on port 56033 Thread 1> tid=0x5a] PrimaryRoutingFunction processing args= [PDX[8290614,example.client.domain.Trade]{id=756}, EventID[id=31 bytes;threadID=1;sequenceID=756]]
[info 2020/08/23 14:17:05.691 HST <ServerConnection on port 56033 Thread 1> tid=0x5a] PrimaryRoutingFunction processing args= [PDX[8290614,example.client.domain.Trade]{id=758}, EventID[id=31 bytes;threadID=1;sequenceID=758]]
[info 2020/08/23 14:17:05.709 HST <ServerConnection on port 56033 Thread 1> tid=0x5a] PrimaryRoutingFunction processing args= [PDX[8290614,example.client.domain.Trade]{id=764}, EventID[id=31 bytes;threadID=1;sequenceID=764]]
[info 2020/08/23 14:17:05.717 HST <ServerConnection on port 56033 Thread 1> tid=0x5a] PrimaryRoutingFunction processing args= [PDX[8290614,example.client.domain.Trade]{id=766}, EventID[id=31 bytes;threadID=1;sequenceID=766]]
[info 2020/08/23 14:17:05.742 HST <ServerConnection on port 56033 Thread 1> tid=0x5a] PrimaryRoutingFunction processing args= [PDX[8290614,example.client.domain.Trade]{id=769}, EventID[id=31 bytes;threadID=1;sequenceID=769]]
[info 2020/08/23 14:17:05.757 HST <ServerConnection on port 56033 Thread 1> tid=0x5a] PrimaryRoutingFunction processing args= [PDX[8290614,example.client.domain.Trade]{id=772}, EventID[id=31 bytes;threadID=1;sequenceID=772]]
This server was killed before the RoutingAsyncEventListener could process these six events.
Of these six events, three of them were processed by the SecondaryRoutingFunction in one redundant server. Once the server was killed, these events were processed as possible duplicates by the RoutingAsyncEventListener:
[info 2020/08/23 14:17:05.682 HST <Function Execution Processor2> tid=0x3a] SecondaryRoutingFunction processing args= [/Router, 756, PDX[8290614,example.client.domain.Trade]{id=756}, EventID[id=31 bytes;threadID=1;sequenceID=756], 990]
[info 2020/08/23 14:17:05.692 HST <Function Execution Processor2> tid=0x3a] SecondaryRoutingFunction processing args= [/Router, 758, PDX[8290614,example.client.domain.Trade]{id=758}, EventID[id=31 bytes;threadID=1;sequenceID=758], 766]
[info 2020/08/23 14:17:05.718 HST <Function Execution Processor2> tid=0x3a] SecondaryRoutingFunction processing args= [/Router, 766, PDX[8290614,example.client.domain.Trade]{id=766}, EventID[id=31 bytes;threadID=1;sequenceID=766], 908]
[info 2020/08/23 14:17:11.505 HST <Event Processor for GatewaySender_AsyncEventQueue_routing_aeq_0> tid=0x45] RoutingAsyncEventListener: Processing 5 events (total=266; possibleDuplicate=3)
key=758; value=PDX[8290614,example.client.domain.Trade]{id=758}; possibleDuplicate=true
key=766; value=PDX[8290614,example.client.domain.Trade]{id=766}; possibleDuplicate=true
key=756; value=PDX[8290614,example.client.domain.Trade]{id=756}; possibleDuplicate=true
The other redundant server processed the other three events in the same way:
[info 2020/08/23 14:17:05.710 HST <Function Execution Processor2> tid=0x37] SecondaryRoutingFunction processing args= [/Router, 764, PDX[8290614,example.client.domain.Trade]{id=764}, EventID[id=31 bytes;threadID=1;sequenceID=764], 793]
[info 2020/08/23 14:17:05.744 HST <Function Execution Processor2> tid=0x37] SecondaryRoutingFunction processing args= [/Router, 769, PDX[8290614,example.client.domain.Trade]{id=769}, EventID[id=31 bytes;threadID=1;sequenceID=769], 572]
[info 2020/08/23 14:17:05.758 HST <Function Execution Processor2> tid=0x37] SecondaryRoutingFunction processing args= [/Router, 772, PDX[8290614,example.client.domain.Trade]{id=772}, EventID[id=31 bytes;threadID=1;sequenceID=772], 822]
[info 2020/08/23 14:17:11.490 HST <Event Processor for GatewaySender_AsyncEventQueue_routing_aeq_0> tid=0x45] RoutingAsyncEventListener: Processing 4 events (total=266; possibleDuplicate=3)
key=769; value=PDX[8290614,example.client.domain.Trade]{id=769}; possibleDuplicate=true
key=764; value=PDX[8290614,example.client.domain.Trade]{id=764}; possibleDuplicate=true
key=772; value=PDX[8290614,example.client.domain.Trade]{id=772}; possibleDuplicate=true
Two useful additions to Apache Geode would be: