Skip to content

Commit

Permalink
Fix redis memory leak issue in PhysicalEntityCacheUpdater (#343)
Browse files Browse the repository at this point in the history
- What I did
Fixes the redis memory leak bug:
#342
There's chance that the physical_entity_updater creates subscriptions to redis, and never consume the messages due to exceptions.
Then the memory buffer(omem) of redis client starts to increase and never end, redis memory leaks.
The reason is all 5 physical entity cache updaters inherit from PhysicalEntityCacheUpdater.
In the first update_data, they initialize the psubscription to redis database.
 self.pub_sub_dict[db_index] = mibs.get_redis_pubsub(db, db.STATE_DB, self.get_key_pattern()) 
And everytime when the update_data is called again, it get the message from the psub and process.
 msg = pubsub.get_message() 
And outside, in the logic of the MIBUpdater, it calls update_data more frequently than reinit_data.
A side-effect is, if reinit_data failed forever, the update_counter will not been cleaned, then update_data will not be called forever.
 self.update_counter = 0 
So the problem is, at the begining, the psub is created at the first update_data and all things work well, until an unrecoverable issue happened,
PHYSICAL_ENTITY_INFO|PSU * missing in the database (it's a pmon issue)
This causes both reinit_data and update_data to be failed, because all of them finally call _update_per_namespace_data, which tries to cast an empty string '' to int and raises ValueError.
Then the update_data is not called forever, because reinit_data will never success.
But the previously established psubscription is still there, and no one gonna to consume it(the update_data is blocked), then Redis database memory starts to slowly leak.

- How I did it

Catch the exception during the loop of reinit_data, make sure the reinit_data of every physical_entity_updater will be called
Clear message and cancel the subscription in the reinit_data, avoid the message accumulates in the redis subscription queue
- How to verify it
Tested on Cisco chassis, the memory is not leaking anymore.
  • Loading branch information
yejianquan authored and mssonicbld committed Dec 18, 2024
1 parent 9d5ce53 commit dd57555
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 3 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,4 @@ fabric.properties

gh-release.patch
tests/test_cpuUtilizationHandler.py
tests/test-results.xml
2 changes: 1 addition & 1 deletion src/ax_interface/mib.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def start(self):
redis_exception_happen = False
except RuntimeError:
# Any unexpected exception or error, log it and keep running
logger.exception("MIBUpdater.start() caught an unexpected exception during update_data()")
logger.exception("MIBUpdater.start() caught a RuntimeError during update_data(), will reinitialize the connections")
# When redis server restart, swsscommon will throw swsscommon.RedisError, redis connection need re-initialize in reinit_data()
# TODO: change to swsscommon.RedisError
redis_exception_happen = True
Expand Down
15 changes: 15 additions & 0 deletions src/sonic_ax_impl/mibs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,21 @@ def get_redis_pubsub(db_conn, db_name, pattern):
return pubsub


def cancel_redis_pubsub(pubsub, db_conn, db_name, pattern):
db = db_conn.get_dbid(db_name)
logger.debug(f"Cancel subscription {db} {pattern}")
pubsub.punsubscribe("__keyspace@{}__:{}".format(db, pattern))
return pubsub


def clear_pubsub_msg(pubsub):
while True:
msg = pubsub.get_message()
logger.debug("Clearing pubsub {}, get and drop message {}".format(pubsub, msg))
if not msg:
break


class RedisOidTreeUpdater(MIBUpdater):
def __init__(self, prefix_str):
super().__init__()
Expand Down
42 changes: 41 additions & 1 deletion src/sonic_ax_impl/mibs/ietf/rfc2737.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,33 @@ def reinit_data(self):
self.physical_name_map[chassis_mgmt_sub_id] = name
self.physical_fru_map[chassis_mgmt_sub_id] = self.NOT_REPLACEABLE

exceptions = []
has_runtime_err = False
# Catch exception in the iteration
# This makes sure if any exception is raised in the mid of loop
# every updater's reinit_data function will be always called
# So that the redis subscriptions always get chance to be cleaned,
# Otherwise if the exception never recover,
# the redis subscription keeps increasing but never got consumed,
# this causes redis memory leak.
for updater in self.physical_entity_updaters:
updater.reinit_data()
try:
updater.reinit_data()
except BaseException as e:
if isinstance(e, RuntimeError):
has_runtime_err = True
# Log traceback so that we know the original error details
mibs.logger.error(e, exc_info=True)
exceptions.append(e)

# The RuntimeError will be considered as Redis connection error
# And will trigger re-init connection, if the exceptions contain any RuntimeError
# We raise runtime error
if exceptions:
if has_runtime_err:
raise RuntimeError(exceptions)
else:
raise Exception(exceptions)

def update_data(self):
# This code is not executed in unit test, since mockredis
Expand Down Expand Up @@ -648,6 +673,21 @@ def __init__(self, mib_updater):
self.entity_to_oid_map = {}

def reinit_data(self):

# Redis subscriptions are established and consumed in update_data,
# but if there's stable exception during update logic,
# the reinit_data will be called, but the update_data is never called.
# The message is sent into subscription queue, but never got consumed,
# this causes Redis memory leaking.
# Hence clear the message in the subscription and cancel the subscription during reinit_data
for db_index in list(self.pub_sub_dict):
pubsub = self.pub_sub_dict[db_index]
db_conn = self.mib_updater.statedb[db_index]
# clear message in the subscription and cancel the subscription
mibs.clear_pubsub_msg(pubsub)
mibs.cancel_redis_pubsub(pubsub, db_conn, db_conn.STATE_DB, self.get_key_pattern())
del self.pub_sub_dict[db_index]

self.entity_to_oid_map.clear()
# retrieve the initial list of entity in db
key_info = Namespace.dbs_keys(self.mib_updater.statedb, mibs.STATE_DB, self.get_key_pattern())
Expand Down
3 changes: 3 additions & 0 deletions tests/mock_tables/dbconnector.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def get_message(self):
def psubscribe(self, *args, **kwargs):
pass

def punsubscribe(self, *args, **kwargs):
pass

def __call__(self, *args, **kwargs):
return self

Expand Down
2 changes: 1 addition & 1 deletion tests/test_rfc1213.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_NextHopUpdater_redis_exception(self):

# check warning
expected = [
mock.call("MIBUpdater.start() caught an unexpected exception during update_data()")
mock.call("MIBUpdater.start() caught a RuntimeError during update_data(), will reinitialize the connections")
]
mocked_exception.assert_has_calls(expected)

Expand Down
138 changes: 138 additions & 0 deletions tests/test_rfc2737.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import os
import sys
from unittest import TestCase

import pytest
from sonic_ax_impl.mibs.ietf.rfc2737 import PhysicalTableMIBUpdater


if sys.version_info.major == 3:
from unittest import mock
else:
import mock

modules_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(modules_path, 'src'))


class TestPhysicalTableMIBUpdater(TestCase):

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The first updater(XcvrCacheUpdater) raises exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration_first(self, mocked_xcvr_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(Exception) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(excinfo.value) == "[Exception('mocked error')]"

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The last updater(ThermalCacheUpdater) raises exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_exception_in_reinit_data_wont_block_reinit_iteration_last(self, mocked_thermal_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(Exception) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(excinfo.value) == "[Exception('mocked error')]"

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The first updater(XcvrCacheUpdater) raises Runtime exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error'))
def test_PhysicalTableMIBUpdater_runtime_exc_in_reinit_data_wont_block_reinit_iteration_first(self, mocked_thermal_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(RuntimeError) as excinfo,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data') as mocked_xcvr_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch(
'sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_thermal_reinit_data.assert_called()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(excinfo.value) == "[RuntimeError('mocked runtime error')]"

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The last updater(XcvrCacheUpdater) raises Runtime exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error'))
def test_PhysicalTableMIBUpdater_runtime_exc_in_reinit_data_wont_block_reinit_iteration_last(self, mocked_xcvr_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(RuntimeError) as exc_info,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data') as mocked_thermal_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(exc_info.value) == "[RuntimeError('mocked runtime error')]"

# Given: 5 physical updaters are register into reinit of PhysicalTableMIBUpdater
# When: The first(XcvrCacheUpdater) and last updater(ThermalCacheUpdater)
# raises Runtime exception and Exception in the reinit
# Then: The remaining updaters should execute reinit without any affection,
# and the redis un-subscription should be called
# Both the RuntimeError and Exception should be caught and combined as RuntimeError then been raised
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.XcvrCacheUpdater.reinit_data', side_effect=RuntimeError('mocked runtime error'))
@mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.ThermalCacheUpdater.reinit_data', side_effect=Exception('mocked error'))
def test_PhysicalTableMIBUpdater_multi_exception(self, mocked_xcvr_reinit_data, mocked_thermal_reinit_data):
updater = PhysicalTableMIBUpdater()

with (pytest.raises(RuntimeError) as exc_info,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.PsuCacheUpdater.reinit_data') as mocked_psu_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanDrawerCacheUpdater.reinit_data') as mocked_fan_drawer_reinit_data,
mock.patch('sonic_ax_impl.mibs.ietf.rfc2737.FanCacheUpdater.reinit_data') as mocked_fan_cache_reinit_data,
mock.patch('sonic_ax_impl.mibs.cancel_redis_pubsub') as mocked_cancel_redis_pubsub):
updater.reinit_data()
mocked_xcvr_reinit_data.assert_called()
mocked_psu_reinit_data.assert_called()
mocked_fan_drawer_reinit_data.assert_called()
mocked_fan_cache_reinit_data.assert_called()
mocked_thermal_reinit_data.assert_called()
mocked_cancel_redis_pubsub.assert_called()
assert str(exc_info.value) == "[RuntimeError('mocked runtime error'), Exception('mocked error')]"

0 comments on commit dd57555

Please sign in to comment.