Calculating queue, transmission and total processing times for Apache Geode GatewaySender events can be helpful for WAN resource capacity planning like the amount of queue memory to allocate and the number of dispatcher threads to configure. Unfortunately, this data is not readily available in Apache Geode out-of-the-box.
This article describes how to implement a GatewayEventFilter to calculate these times using a custom Apache Geode Statistics object readable via vsd.
All source code described in this article is available here.
The TimingGatewayEventFilter implements the GatewayEventFilter interface to calculate queue, transmission and total processing times for GatewaySender events. The interface defines three methods:
The GatewaySenderQueueStatistics creates a custom Statistics object that defines the following statistics:
Note: The queuedEvents statistic is the same as GatewaySenderStatistics eventsQueued, and the transmittedEvents statistic is the same as GatewaySenderStatistics eventsDistributed. They are included here for completeness.
The TimingGatewayEventFilter beforeEnqueue method is invoked for each GatewayQueueEvent before it is added to the queue. It tracks the queue start time and invokes the GatewaySenderQueueStatistics beforeEnqueue method.
public boolean beforeEnqueue(GatewayQueueEvent event) {
// Increment the queued events
this.queueStatistics.beforeEnqueue();
// Set the queue start time for this event
GatewaySenderEventImpl gsei = (GatewaySenderEventImpl) event;
this.queueStartTimes.put(gsei.getShadowKey(), System.currentTimeMillis());
return true;
}
The GatewaySenderQueueStatistics beforeEnqueue method increments the queued events.
public void beforeEnqueue() {
this.stats.incLong(queuedEventsId, 1);
}
The TimingGatewayEventFilter beforeTransmit method is invoked for each GatewayQueueEvent before it is transmitted to the remote site. It retrieves the queue start time, tracks the transmission start time and invokes the GatewaySenderQueueStatistics beforeTransmit method.
public boolean beforeTransmit(GatewayQueueEvent event) {
// This method can be called multiple times for the same batch if the remote site is
// not connected.
GatewaySenderEventImpl gsei = (GatewaySenderEventImpl) event;
if (!this.transmissionStartTimes.containsKey(gsei.getShadowKey())) {
// Get the current time and update the statistics
long currentTime = System.currentTimeMillis();
this.queueStatistics.beforeTransmit(this.queueStartTimes.get(gsei.getShadowKey()), currentTime);
// Set the transmit start time for this event
this.transmissionStartTimes.put(gsei.getShadowKey(), currentTime);
}
return true;
}
The GatewaySenderQueueStatistics beforeTransmit method increments the transmitted events and total, minimum and maximum time spent on the queue.
public long beforeTransmit(long queueStartTime, long currentTime) {
long queueTime = currentTime - queueStartTime;
this.stats.incLong(transmittedEventsId, 1);
// Update queue time statistics
this.stats.incLong(totalQueueTimeId, queueTime);
if (this.minimumQueueTimeNotSet.compareAndSet(false, true)
|| this.stats.getLong(minimumQueueTimeId) > queueTime) {
this.stats.setLong(minimumQueueTimeId, queueTime);
}
if (this.stats.getLong(maximumQueueTimeId) < queueTime) {
this.stats.setLong(maximumQueueTimeId, queueTime);
}
return currentTime;
}
The TimingGatewayEventFilter afterAcknowledgement method is invoked for each GatewayQueueEvent after its acknowledgement has been received from the remote site. It retrieves the queue and transmission start times and invokes the GatewaySenderQueueStatistics afterAcknowledgement method.
public void afterAcknowledgement(GatewayQueueEvent event) {
// Get transmit start time for this event
GatewaySenderEventImpl gsei = (GatewaySenderEventImpl) event;
Long queueStartTime = this.queueStartTimes.remove(gsei.getShadowKey());
Long transmissionStartTime = this.transmissionStartTimes.remove(gsei.getShadowKey());
// If the event was not transmitted by this member, ignore it.
// Only handle primary events.
if (transmissionStartTime != null) {
// Update the statistics
this.queueStatistics.afterAcknowledgement(transmissionStartTime, queueStartTime);
}
}
The GatewaySenderQueueStatistics afterAcknowledgement method increments the acknowledged events and total, minimum and maximum transmission time as well as the total, minimum and maximum processing time.
public void afterAcknowledgement(long transmissionStartTime, long queueStartTime) {
long currentTime = System.currentTimeMillis();
long transmissionTime = currentTime - transmissionStartTime;
long processTime = currentTime - queueStartTime;
// Update acknowledged events
this.stats.incLong(acknowledgedEventsId, 1);
// Update transmission time statistics
this.stats.incLong(totalTransmissionTimeId, transmissionTime);
if (this.minimumTransmissionTimeNotSet.compareAndSet(false, true)
|| this.stats.getLong(minimumTransmissionTimeId) > transmissionTime) {
this.stats.setLong(minimumTransmissionTimeId, transmissionTime);
}
if (this.stats.getLong(maximumTransmissionTimeId) < transmissionTime) {
this.stats.setLong(maximumTransmissionTimeId, transmissionTime);
}
// Update processing time statistics
this.stats.incLong(totalProcessingTimeId, processTime);
if (this.minimumProcessingTimeNotSet.compareAndSet(false, true)
|| this.stats.getLong(minimumProcessingTimeId) > processTime) {
this.stats.setLong(minimumProcessingTimeId, processTime);
}
if (this.stats.getLong(maximumProcessingTimeId) < processTime) {
this.stats.setLong(maximumProcessingTimeId, processTime);
}
}
Sample vsd charts for the statistics defined by the GatewaySenderQueueStatistics are shown below.
This chart shows the number of events queued, transmitted and acknowledged for the specific GatewaySender.
This chart shows total queue, transmission and processing times for all of the GatewaySender’s events.
GatewaySender event queue, transmission and processing time statistics like these available out-of-the-box would be a useful addition to Apache Geode.