Skip to content

About test_subs_oper.py #26

@fourwilling

Description

@fourwilling

Hi rjarry,

I tried to modify one of your tests as below. However, it seems that this can only run successfully for one time. When I ran it for the second time, it gave me "Module "sysrepo-example" was not found in sysrepo"

Do you know where I went wrong? Thank you!

import logging
import os
import unittest
import sysrepo

YANG_FILE = os.path.join(
    os.path.dirname(os.path.dirname(__file__)), "examples/sysrepo-example.yang"
)
sysrepo.configure_logging(stderr_level=logging.ERROR)

# ------------------------------------------------------------------------------
class OperSubscriptionTest(unittest.TestCase):
    def test_oper_sub(self):
        priv = object()
        state = None

        def oper_data_cb(xpath, private_data):
            self.assertEqual(xpath, "/sysrepo-example:state")
            self.assertEqual(private_data, priv)
            return state

        with sysrepo.SysrepoConnection() as conn:
            # conn.remove_module("sysrepo-example")
            conn.install_module(YANG_FILE, enabled_features=["turbo"])
            with conn.start_session("operational") as sess:
                sess.subscribe_oper_data_request(
                    "sysrepo-example",
                    "/sysrepo-example:state",
                    oper_data_cb,
                    private_data=priv,
                    strict=True,
                )
                with conn.start_session("operational") as op_sess:
                    state = {
                        "state": {
                            "system": {"hostname": "foo"},
                            "network": {
                                "interface": [
                                    {
                                        "name": "eth0",
                                        "address": "1.2.3.4/24",
                                        "up": True,
                                        "stats": {"rx": 42, "tx": 42},
                                    }
                                ]
                            },
                        }
                    }
                    self.assertEqual(op_sess.get_data("/sysrepo-example:state"), state)
                    state = {"state": {"invalid": True}}
                    with self.assertRaises(sysrepo.SysrepoCallbackFailedError):
                        op_sess.get_data("/sysrepo-example:state")

            conn.disconnect()
            conn.remove_module("sysrepo-example")

unittest.main()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions