Skip to content

Commit

Permalink
Added writecache test case.
Browse files Browse the repository at this point in the history
  • Loading branch information
lextm committed Apr 22, 2024
1 parent 9a47d8c commit 2b9b4c7
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 0 deletions.
23 changes: 23 additions & 0 deletions tests/data/UPS2/public.snmprec
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
1.3.6.1.2.1.1.1.0|4x|415043205765622f534e4d50204d616e6167656d656e74204361726420284d423a76342e312e302050463a76362e372e3220504e3a6170635f687730355f616f735f3637322e62696e204146313a76362e372e3220414e313a6170635f687730355f7270647532675f3637322e62696e204d4e3a4150383933322048523a303220534e3a20334635303341313639303433204d443a30312f32332f3230313929
1.3.6.1.2.1.1.2.0|6|1.3.6.1.4.1.318.1.3.4.6
1.3.6.1.2.1.1.3.0|67|165328680
1.3.6.1.2.1.1.4.0|4|Unknown
1.3.6.1.2.1.1.5.0|4x|7077722d646330312d7064752d7261636b332d3031
1.3.6.1.2.1.1.6.0|4:writecache|value=Unknown
1.3.6.1.2.1.1.7.0|2|72
1.3.6.1.2.1.1.8.0|67|0
1.3.6.1.2.1.1.9.1.2.1|6|1.3.6.1.6.3.1
1.3.6.1.2.1.1.9.1.2.2|6|1.3.6.1.6.3.10.3.1.1
1.3.6.1.2.1.1.9.1.2.3|6|1.3.6.1.6.3.11.3.1.1
1.3.6.1.2.1.1.9.1.2.4|6|1.3.6.1.6.3.15.2.1.1
1.3.6.1.2.1.1.9.1.2.5|6|1.3.6.1.6.3.16.2.1.1
1.3.6.1.2.1.1.9.1.3.1|4x|546865204d4942204d6f64756c652066726f6d20534e4d50763220656e746974696573
1.3.6.1.2.1.1.9.1.3.2|4x|534e4d50204d616e6167656d656e7420417263686974656374757265204d4942
1.3.6.1.2.1.1.9.1.3.3|4x|4d6573736167652050726f63657373696e6720616e64204469737061746368696e67204d4942
1.3.6.1.2.1.1.9.1.3.4|4x|55534d2055736572204d4942
1.3.6.1.2.1.1.9.1.3.5|4x|5641434d204d4942
1.3.6.1.2.1.1.9.1.4.1|67|0
1.3.6.1.2.1.1.9.1.4.2|67|0
1.3.6.1.2.1.1.9.1.4.3|67|0
1.3.6.1.2.1.1.9.1.4.4|67|0
1.3.6.1.2.1.1.9.1.4.5|67|0
121 changes: 121 additions & 0 deletions tests/test_writecache_variation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import os
import sys
import threading
import time
from snmpsim.commands.responder import main as responder_main
import pytest
from pysnmp.hlapi.asyncio import *
from pysnmp.hlapi.asyncio.slim import Slim
from pysnmp import debug

import asyncio

TIME_OUT = 5
PORT_NUMBER = 1611


@pytest.fixture(autouse=True)
def setup_args():
# Store the original sys.argv
original_argv = sys.argv
# Define your test arguments here
base_dir = os.path.dirname(os.path.abspath(__file__))
data_dir = os.path.join(base_dir, "data", "UPS2")
test_args = [
"responder.py",
f"--data-dir={data_dir}",
f"--agent-udpv4-endpoint=127.0.0.1:{PORT_NUMBER}",
f"--timeout={TIME_OUT}",
]
# Set sys.argv to your test arguments
sys.argv = test_args
# This will run before the test function
yield
# Restore the original sys.argv after the test function has finished
sys.argv = original_argv


# Fixture to run the application in a separate thread
@pytest.fixture
def run_app_in_background():
def target():
debug.setLogger(debug.Debug("app"))
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
responder_main()
except KeyboardInterrupt:
print("Application interrupted.")
finally:
print("Application stopped.")
loop.close()

app_thread = threading.Thread(target=target)
app_thread.start()
# Allow some time for the application to initialize and run
time.sleep(1)
yield
# Simulate KeyboardInterrupt after the test is done
# This part may need to be adjusted based on how your application handles shutdown
app_thread.join(timeout=1)


@pytest.mark.asyncio
async def test_main_with_specific_args(run_app_in_background, capsys):
snmpEngine = SnmpEngine()
try:
# Create SNMP GET request v1
with Slim(1) as slim:
errorIndication, errorStatus, errorIndex, varBinds = await slim.get(
"public",
"localhost",
PORT_NUMBER,
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysLocation", 0)),
retries=0,
)

assert errorIndication is None
assert errorStatus == 0
assert errorIndex == 0
assert len(varBinds) == 1
assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysLocation.0"
assert varBinds[0][1].prettyPrint() == "Unknown"
assert isinstance(varBinds[0][1], OctetString)

errorIndication, errorStatus, errorIndex, varBinds = await slim.set(
"public",
"localhost",
PORT_NUMBER,
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysLocation", 0), "Shanghai"),
retries=0,
)

assert errorIndication is None
assert errorStatus == 0
assert errorIndex == 0
assert len(varBinds) == 1
assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysLocation.0"
assert varBinds[0][1].prettyPrint() == "Shanghai"
assert isinstance(varBinds[0][1], OctetString)

errorIndication, errorStatus, errorIndex, varBinds = await slim.get(
"public",
"localhost",
PORT_NUMBER,
ObjectType(ObjectIdentity("SNMPv2-MIB", "sysLocation", 0)),
retries=0,
)

assert errorIndication is None
assert errorStatus == 0
assert errorIndex == 0
assert len(varBinds) == 1
assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysLocation.0"
assert varBinds[0][1].prettyPrint() == "Shanghai"
assert isinstance(varBinds[0][1], OctetString)
finally:
if snmpEngine.transportDispatcher:
snmpEngine.transportDispatcher.closeDispatcher()

await asyncio.sleep(TIME_OUT)
# Rest of your test code...

0 comments on commit 2b9b4c7

Please sign in to comment.