Skip to content

Commit

Permalink
Refresh mbeans cache when they are registered/unregistered
Browse files Browse the repository at this point in the history
Signed-off-by: Max Melentyev <max.melentyev@reddit.com>
  • Loading branch information
max-melentyev committed Oct 24, 2024
1 parent 692f9da commit 32fae25
Showing 1 changed file with 66 additions and 24 deletions.
90 changes: 66 additions & 24 deletions collector/src/main/java/io/prometheus/jmx/JmxScraper.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularType;
import javax.management.relation.MBeanServerNotificationFilter;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
Expand Down Expand Up @@ -62,9 +63,23 @@ void recordBean(

// Values cached per connection.
private MBeanServerConnection _beanConn;
private Set<ObjectName> mBeanNames;
private ObjectNameAttributeFilter objectNameAttributeFilter;
private JmxMBeanPropertyCache jmxMBeanPropertyCache;
private Cache cache;
private boolean cacheIsStale = false;

private class Cache {
private final Set<ObjectName> mBeanNames;
private final ObjectNameAttributeFilter objectNameAttributeFilter;
private final JmxMBeanPropertyCache jmxMBeanPropertyCache;

private Cache(
Set<ObjectName> mBeanNames,
ObjectNameAttributeFilter objectNameAttributeFilter,
JmxMBeanPropertyCache jmxMBeanPropertyCache) {
this.mBeanNames = mBeanNames;
this.objectNameAttributeFilter = objectNameAttributeFilter;
this.jmxMBeanPropertyCache = jmxMBeanPropertyCache;
}
}

public JmxScraper(
String jmxUrl,
Expand Down Expand Up @@ -119,9 +134,41 @@ private MBeanServerConnection connectToMBeanServer() throws Exception {
return jmxc.getMBeanServerConnection();
}

private void loadMBeanNames(MBeanServerConnection beanConn) throws Exception {
private synchronized MBeanServerConnection getMBeanServerConnection() throws Exception {
if (_beanConn == null) {
cacheIsStale = true;
_beanConn = connectToMBeanServer();
// Subscribe to MBeans register/unregister events to invalidate cache
MBeanServerNotificationFilter filter = new MBeanServerNotificationFilter();
filter.enableAllObjectNames();
_beanConn.addNotificationListener(
MBeanServerDelegate.DELEGATE_NAME,
(notification, handback) -> {
String type = notification.getType();
if (MBeanServerNotification.REGISTRATION_NOTIFICATION.equals(type)
|| MBeanServerNotification.UNREGISTRATION_NOTIFICATION.equals(
type)) {
LOGGER.log(FINE, "Marking cache as stale due to %s", type);
// Mark cache as stale instead of refreshing it immediately
// to debounce multiple notifications.
synchronized (this) {
cacheIsStale = true;
}
}
},
filter,
null);
}
if (cacheIsStale) {
cache = fetchCache(_beanConn);
cacheIsStale = false;
}
return _beanConn;
}

private Cache fetchCache(MBeanServerConnection beanConn) throws Exception {
// Query MBean names, see #89 for reasons queryMBeans() is used instead of queryNames()
mBeanNames = new HashSet<>();
Set<ObjectName> mBeanNames = new HashSet<>();
for (ObjectName name : includeObjectNames) {
for (ObjectInstance instance : beanConn.queryMBeans(name, null)) {
mBeanNames.add(instance.getObjectName());
Expand All @@ -134,37 +181,32 @@ private void loadMBeanNames(MBeanServerConnection beanConn) throws Exception {
}
}

this.jmxMBeanPropertyCache = new JmxMBeanPropertyCache(mBeanNames);
ObjectNameAttributeFilter attributeFilter = defaultObjectNameAttributeFilter.dup();
attributeFilter.onlyKeepMBeans(mBeanNames);

this.objectNameAttributeFilter = defaultObjectNameAttributeFilter.dup();
objectNameAttributeFilter.onlyKeepMBeans(mBeanNames);
return new Cache(mBeanNames, attributeFilter, new JmxMBeanPropertyCache(mBeanNames));
}

/**
* Get a list of mbeans on host_port and scrape their values.
*
* <p>Values are passed to the receiver in a single thread.
*/
public void doScrape(MBeanReceiver receiver) throws Exception {
synchronized (this) {
if (_beanConn == null) {
_beanConn = connectToMBeanServer();
loadMBeanNames(_beanConn);
}
}
MBeanServerConnection beanConn = _beanConn;
public synchronized void doScrape(MBeanReceiver receiver) throws Exception {
// Method is synchronized to avoid multiple scrapes running concurrently
// and let one of them refresh the cache in the middle of the scrape.

for (ObjectName objectName : mBeanNames) {
MBeanServerConnection beanConn = getMBeanServerConnection();

for (ObjectName objectName : cache.mBeanNames) {
long start = System.nanoTime();
scrapeBean(receiver, beanConn, objectName);
LOGGER.log(FINE, "TIME: %d ns for %s", System.nanoTime() - start, objectName);
}
}

private void scrapeBean(
MBeanReceiver receiver,
MBeanServerConnection beanConn,
ObjectName mBeanName) {
MBeanReceiver receiver, MBeanServerConnection beanConn, ObjectName mBeanName) {
MBeanInfo mBeanInfo;

try {
Expand All @@ -186,7 +228,7 @@ private void scrapeBean(
continue;
}

if (objectNameAttributeFilter.exclude(mBeanName, mBeanAttributeInfo.getName())) {
if (cache.objectNameAttributeFilter.exclude(mBeanName, mBeanAttributeInfo.getName())) {
continue;
}

Expand Down Expand Up @@ -249,7 +291,7 @@ private void scrapeBean(
receiver,
mBeanName,
mBeanDomain,
jmxMBeanPropertyCache.getKeyPropertyList(mBeanName),
cache.jmxMBeanPropertyCache.getKeyPropertyList(mBeanName),
new LinkedList<>(),
mBeanAttributeInfo.getName(),
mBeanAttributeInfo.getType(),
Expand Down Expand Up @@ -289,7 +331,7 @@ private void processAttributesOneByOne(
receiver,
mbeanName,
mbeanName.getDomain(),
jmxMBeanPropertyCache.getKeyPropertyList(mbeanName),
cache.jmxMBeanPropertyCache.getKeyPropertyList(mbeanName),
new LinkedList<>(),
attr.getName(),
attr.getType(),
Expand Down Expand Up @@ -447,7 +489,7 @@ private void processBeanValue(
attrDescription,
value.toString());
} else {
objectNameAttributeFilter.add(objectName, attrName);
cache.objectNameAttributeFilter.add(objectName, attrName);
LOGGER.log(FINE, "%s%s scrape: %s not exported", domain, beanProperties, attrType);
}
}
Expand Down

0 comments on commit 32fae25

Please sign in to comment.