Source code for noc.core.script.base

# -*- coding: utf-8 -*-
##----------------------------------------------------------------------
## SA Script base
##----------------------------------------------------------------------
## Copyright (C) 2007-2016 The NOC Project
## See LICENSE for details
##----------------------------------------------------------------------

## Python modules
import re
import logging
import time
import itertools
import operator
## NOC modules
from snmp.base import SNMP
from snmp.beef import BeefSNMP
from http.base import HTTP
from noc.lib.log import PrefixLoggerAdapter
from noc.lib.validators import is_int
from context import (ConfigurationContextManager, CacheContextManager,
                     IgnoredExceptionsContextManager)
from noc.core.profile.loader import loader as profile_loader
from noc.core.handler import get_handler
from noc.lib.mac import MAC
from beef import Beef


[docs]class BaseScript(object): """ Service Activation script base class """ class __metaclass__(type): """ Process @match decorators """ def __new__(mcs, name, bases, attrs): n = type.__new__(mcs, name, bases, attrs) n._execute_chain = sorted( (v for v in attrs.itervalues() if hasattr(v, "_seq")), key=operator.attrgetter("_seq") ) return n name = None """ Script name in form of <vendor>.<system>.<name> """ TIMEOUT = 120 """ Default script timeout """ cache = False """ Enable call cache If True, script result will be cached and reused during lifetime of parent script """ interface = None """ Implemented interface """ requires = [] """ Scripts required by generic script. For common scripts - empty list For generics - list of pairs (script_name, interface) """ base_logger = logging.getLogger(name or "script") _x_seq = itertools.count() # Common errors
[docs] class ScriptError(Exception): """ Script error :raise: ScriptError """
# LoginError = LoginError
[docs] class CLISyntaxError(ScriptError): """ Syntax error :raise: CLISyntaxError """
[docs] class CLIOperationError(ScriptError): """ Operational CLI error :raise: CLIOperationError """
# CLITransportError = CLITransportError # CLIDisconnectedError = CLIDisconnectedError # TimeOutError = TimeOutError
[docs] class NotSupportedError(ScriptError): """ Feature is not supported :raise: NotSupportedError """
[docs] class UnexpectedResultError(ScriptError): """ Unexpected result :raise: UnexpectedResultError """
hexbin = { "0": "0000", "1": "0001", "2": "0010", "3": "0011", "4": "0100", "5": "0101", "6": "0110", "7": "0111", "8": "1000", "9": "1001", "a": "1010", "b": "1011", "c": "1100", "d": "1101", "e": "1110", "f": "1111" } cli_protocols = { "telnet": "noc.core.script.cli.telnet.TelnetCLI", "ssh": "noc.core.script.cli.ssh.SSHCLI", "beef": "noc.core.script.cli.beef.BeefCLI" } def __init__(self, service, credentials, args=None, capabilities=None, version=None, parent=None, timeout=None, name=None, collect_beef=False): self.service = service self.tos = self.service.config.tos self.pool = self.service.config.pool self.parent = parent self._motd = None name = name or self.name self.logger = PrefixLoggerAdapter( self.base_logger, "%s] [%s" % (self.name, credentials.get("address", "-")) ) if self.parent: self.profile = self.parent.profile else: self.profile = profile_loader.get_profile( ".".join(name.split(".")[:2]) )() self.credentials = credentials or {} self.version = version or {} self.capabilities = capabilities self.timeout = timeout or self.get_timeout() self.start_time = None self.args = self.clean_input(args or {}) self.cli_stream = None if collect_beef: self.beef = Beef(script=self.name) self.logger.info("Collecting beef %s", self.beef.uuid) else: self.beef = None if self.parent: self.snmp = self.root.snmp self.beef = self.parent.beef else: if self.credentials.get("beef"): self.snmp = BeefSNMP(self) else: self.snmp = SNMP(self, beef=self.beef) self.http = HTTP(self) self.to_disable_pager = not self.parent and self.profile.command_disable_pager self.to_shutdown_session = False self.scripts = ScriptsHub(self) self.is_cached = False # Cache CLI and SNMP calls, if set # Suitable only when self.parent is None. # Cached results for scripts marked with "cache" self.call_cache = {} # if not parent and version and not name.endswith(".get_version"): self.logger.debug("Filling get_version cache with %s", version) s = name.split(".") self.set_cache( "%s.%s.get_version" % (s[0], s[1]), {}, version ) # if self.profile.setup_script: self.profile.setup_script(self) def __call__(self, *args, **kwargs): self.args = kwargs return self.run()
[docs] def clean_input(self, args): """ Cleanup input parameters against interface :param args: Arguments for cleaning method :return: Cleaned input :rtype: str """ return self.interface().script_clean_input(self.profile, **args)
[docs] def clean_output(self, result): """ Clean script result against interface :param str result: Output from device :return: Cleaned output :rtype: str """ return self.interface().script_clean_result(self.profile, result)
[docs] def run(self): """ Run script :return: Result return execute() of method """ self.start_time = time.time() self.logger.info("Running. Input arguments: %s, timeout %s", self.args, self.timeout) # Use cached result when available cache_hit = False if self.cache and self.parent: try: result = self.get_cache(self.name, self.args) self.logger.info("Using cached result") cache_hit = True except KeyError: pass # Execute script if not cache_hit: try: result = self.execute(**self.args) if self.cache and self.parent and result: self.logger.info("Caching result") self.set_cache(self.name, self.args, result) finally: if not self.parent: # Close SNMP socket when necessary self.snmp.close() # Close CLI socket when necessary self.close_cli_stream() # Close HTTP Client self.http.close() # Clean result result = self.clean_output(result) self.logger.debug("Result: %s", result) runtime = time.time() - self.start_time self.logger.info("Complete (%.2fms)", runtime * 1000) return result
@classmethod
[docs] def compile_match_filter(cls, *args, **kwargs): """ Compile arguments into version check function :return: Returns callable accepting self and version hash arguments """ c = [lambda self, x, g=f: g(x) for f in args] for k, v in kwargs.items(): # Split to field name and lookup operator if "__" in k: f, o = k.split("__") else: f = k o = "exact" # Check field name if f not in ("vendor", "platform", "version", "image"): raise Exception("Invalid field '%s'" % f) # Compile lookup functions if o == "exact": c += [lambda self, x, f=f, v=v: x[f] == v] elif o == "iexact": c += [lambda self, x, f=f, v=v: x[f].lower() == v.lower()] elif o == "startswith": c += [lambda self, x, f=f, v=v: x[f].startswith(v)] elif o == "istartswith": c += [lambda self, x, f=f, v=v: x[f].lower().startswith(v.lower())] elif o == "endswith": c += [lambda self, x, f=f, v=v: x[f].endswith(v)] elif o == "iendswith": c += [lambda self, x, f=f, v=v: x[f].lower().endswith(v.lower())] elif o == "contains": c += [lambda self, x, f=f, v=v: v in x[f]] elif o == "icontains": c += [lambda self, x, f=f, v=v: v.lower() in x[f].lower()] elif o == "in": c += [lambda self, x, f=f, v=v: x[f] in v] elif o == "regex": c += [lambda self, x, f=f, v=re.compile(v): v.search(x[f]) is not None] elif o == "iregex": c += [lambda self, x, f=f, v=re.compile(v, re.IGNORECASE): v.search(x[f]) is not None] elif o == "isempty": # Empty string or null c += [lambda self, x, f=f, v=v: not x[f] if v else x[f]] elif f == "version": if o == "lt": # < c += [lambda self, x, v=v: self.profile.cmp_version(x["version"], v) < 0] elif o == "lte": # <= c += [lambda self, x, v=v: self.profile.cmp_version(x["version"], v) <= 0] elif o == "gt": # > c += [lambda self, x, v=v: self.profile.cmp_version(x["version"], v) > 0] elif o == "gte": # >= c += [lambda self, x, v=v: self.profile.cmp_version(x["version"], v) >= 0] else: raise Exception("Invalid lookup operation: %s" % o) else: raise Exception("Invalid lookup operation: %s" % o) # Combine expressions into single lambda return reduce( lambda x, y: lambda self, v, x=x, y=y: ( x(self, v) and y(self, v) ), c, lambda self, x: True )
@classmethod
[docs] def match(cls, *args, **kwargs): """ execute method decorator """ def wrap(f): # Append to the execute chain if hasattr(f, "_match"): old_filter = f._match f._match = lambda self, v, old_filter=old_filter, new_filter=new_filter: new_filter(self, v) or old_filter(self, v) else: f._match = new_filter f._seq = cls._x_seq.next() return f # Compile check function new_filter = cls.compile_match_filter(*args, **kwargs) # Return decorated function return wrap
[docs] def match_version(self, *args, **kwargs): """ inline version for BaseScript.match """ if not self.version: self.version = self.scripts.get_version() return self.compile_match_filter(*args, **kwargs)( self, self.version )
[docs] def execute(self, **kwargs): """ Default script behavior: Pass through _execute_chain and call appropriative handler """ if self._execute_chain and not self.name.endswith(".get_version"): # Get version information if not self.version: self.version = self.scripts.get_version() # Find and execute proper handler for f in self._execute_chain: if f._match(self, self.version): return f(self, **kwargs) # Raise error raise self.NotSupportedError()
[docs] def cleaned_config(self, config): """ Clean up config from all unnecessary trash :param str config: Configuration for clean :return: Clean up config from all unnecessary trash :rtype: str """ return self.profile.cleaned_config(config)
[docs] def strip_first_lines(self, text, lines=1): """ Strip first *lines* :param str text: Text :param int lines: Number of lines for strip :return: Text with stripped first N lines :rtype: str """ t = text.split("\n") if len(t) <= lines: return "" else: return "\n".join(t[lines:])
[docs] def expand_rangelist(self, s): """ Expand expressions like "1,2,5-7" to [1, 2, 5, 6, 7] :param str s: Comma-separated list :return: [1, 2, 5, 6, 7] :rtype: list """ result = {} for x in s.split(","): x = x.strip() if x == "": continue if "-" in x: l, r = [int(y) for y in x.split("-")] if l > r: x = r r = l l = x for i in range(l, r + 1): result[i] = None else: result[int(x)] = None return sorted(result.keys())
rx_detect_sep = re.compile("^(.*?)\d+$")
[docs] def expand_interface_range(self, s): """ Convert interface range expression to a list of interfaces "Gi 1/1-3,Gi 1/7" -> ["Gi 1/1", "Gi 1/2", "Gi 1/3", "Gi 1/7"] "1:1-3" -> ["1:1", "1:2", "1:3"] "1:1-1:3" -> ["1:1", "1:2", "1:3"] :param str s: Comma-separated list :returns: ["port", ..] :rtype: list """ r = set() for x in s.split(","): x = x.strip() if not x: continue if "-" in x: # Expand range f, t = [y.strip() for y in x.split("-")] # Detect common prefix match = self.rx_detect_sep.match(f) if not match: raise ValueError(x) prefix = match.group(1) # Detect range boundaries start = int(f[len(prefix):]) if is_int(t): stop = int(t) # Just integer else: if not t.startswith(prefix): raise ValueError(x) stop = int(t[len(prefix):]) # Prefixed if start > stop: raise ValueError(x) for i in range(start, stop + 1): r.add(prefix + str(i)) else: r.add(x) return sorted(r)
[docs] def macs_to_ranges(self, macs): """ Converts list of macs to rangea :param macs: Iterable yielding mac addresses :returns: [(from, to), ..] """ r = [] for m in sorted(MAC(x) for x in macs): if r: if r[-1][1].shift(1) == m: # Expand last range r[-1][1] = m else: r += [[m, m]] else: r += [[m, m]] return [(str(x[0]), str(x[1])) for x in r]
[docs] def hexstring_to_mac(self, s): """ Convert a 6-octet string to MAC address :param str s: 6-octet string :return: MAC Address :rtype: MAC """ return ":".join(["%02X" % ord(x) for x in s])
@property def root(self): """ Get root script :return :rtype: str """ if self.parent: return self.parent.root else: return self
[docs] def get_cache(self, key1, key2): """ Get cached result or raise KeyError :param key1 str: Cache key1 :param key2 str: Cache key2 :returns: Cache result :rtype: object """ s = self.root return s.call_cache[repr(key1)][repr(key2)]
[docs] def set_cache(self, key1, key2, value): """ Set cached result :param key1, key2: Cache key :param value: Value for write to cache """ key1 = repr(key1) key2 = repr(key2) s = self.root if key1 not in s.call_cache: s.call_cache[key1] = {} s.call_cache[key1][key2] = value
[docs] def configure(self): """ :return: Returns configuration context :rtype: object """ return ConfigurationContextManager(self)
[docs] def cached(self): """ Return cached context managed. All nested CLI and SNMP GET/GETNEXT calls will be cached. Usage: with self.cached(): self.cli(".....) self.scripts.script() """ return CacheContextManager(self)
[docs] def enter_config(self): """ Enter configuration mote :return: """ if self.profile.command_enter_config: self.cli(self.profile.command_enter_config)
[docs] def leave_config(self): """ Leave configuration mode :return: """ if self.profile.command_leave_config: self.cli(self.profile.command_leave_config) self.cli("") # Guardian empty command to wait until configuration is finally written
[docs] def save_config(self, immediately=False): """ Save current config :return: """ if immediately: if self.profile.command_save_config: self.cli(self.profile.command_save_config) else: self.schedule_to_save()
[docs] def schedule_to_save(self): self.need_to_save = True if self.parent: self.parent.schedule_to_save()
[docs] def set_motd(self, motd): """ Set _motd Attrinute - Message of The day :param str motd: :return: """ self._motd = motd
@property def motd(self): """ Return message of the day :return: Message of the day :rtype: str """ if self._motd: return self._motd else: return self.get_cli_stream().get_motd()
[docs] def re_match(self, rx, s, flags=0): """ Match s against regular expression rx using re.match Raise UnexpectedResultError if regular expression is not matched. Returns match object. rx can be string or compiled regular expression :return: re match object :rtype: object """ if isinstance(rx, basestring): rx = re.compile(rx, flags) match = rx.match(s) if match is None: raise self.UnexpectedResultError() return match
_match_lines_cache = {} @classmethod
[docs] def match_lines(cls, rx, s): k = id(rx) if k not in cls._match_lines_cache: _rx = [re.compile(l, re.IGNORECASE) for l in rx] cls._match_lines_cache[k] = _rx else: _rx = cls._match_lines_cache[k] ctx = {} idx = 0 r = _rx[0] for l in s.splitlines(): l = l.strip() match = r.search(l) if match: ctx.update(match.groupdict()) idx += 1 if idx == len(_rx): return ctx r = _rx[idx] return None
[docs] def find_re(self, iter, s): """ Find first matching regular expression or raise Unexpected result error :param iter: Iterable objecth for search :param re s: :return: :rtype: re.match """ for r in iter: if r.search(s): return r raise self.UnexpectedResultError()
[docs] def hex_to_bin(self, s): """ Convert hexadecimal string to boolean string. All non-hexadecimal characters are ignored :param s: Input string :return: Boolean string :rtype: basestring """ return "".join( self.hexbin[c] for c in "".join("%02x" % ord(d) for d in s) )
[docs] def push_prompt_pattern(self, pattern): self.get_cli_stream().push_prompt_pattern(pattern)
[docs] def pop_prompt_pattern(self): self.get_cli_stream().pop_prompt_pattern()
[docs] def has_oid(self, oid): """ Check object responses to oid :return: Check result :rtype: bool """ try: return bool(self.snmp.get(oid)) except self.snmp.TimeOutError: return False
[docs] def get_timeout(self): """ :return: TIMEOUT for script :rtype: int """ return self.TIMEOUT
[docs] def cli(self, cmd, command_submit=None, bulk_lines=None, list_re=None, cached=False, file=None, ignore_errors=False, nowait=False, obj_parser=None, cmd_next=None, cmd_stop=None): """ Execute CLI command and return result. Initiate cli session when necessary :param str cmd: CLI command to execute :param str command_submit: Set in Profile, submit enter command :param bulk_lines: Not use :param list_re: Format output :type: re.MatchObject :param bool cached: Cache result :param file file: Read cli from file :param bool ignore_errors: Ignore syntax error in commands :param bool nowait: Not use :return: if list_re is None, return a string :rtype: str :return: dict : if list_re is regular expression object, return a list of dicts (group name -> value), one dict per matched line :rtype: dict """ if file: with open(file) as f: return f.read() command_submit = command_submit or self.profile.command_submit stream = self.get_cli_stream() r = stream.execute(cmd + command_submit, obj_parser=obj_parser, cmd_next=cmd_next, cmd_stop=cmd_stop) if isinstance(r, basestring): if self.beef: self.beef.set_cli(cmd, r) # Check for syntax errors if not ignore_errors: # Check for syntax error if (self.profile.rx_pattern_syntax_error and self.profile.rx_pattern_syntax_error.search(r)): raise self.CLISyntaxError(r) # Then check for operation error if (self.profile.rx_pattern_operation_error and self.profile.rx_pattern_operation_error.search(r)): raise self.CLIOperationError(r) # Echo cancelation if r[:4096].lstrip().startswith(cmd): r = r.lstrip() if r.startswith(cmd + "\n"): # Remove first line r = self.strip_first_lines(r.lstrip()) else: # Some switches, like ProCurve do not send \n after the echo r = r[len(cmd):] # Convert to list of dicts if list_re is defined if list_re: x = [] for l in r.splitlines(): match = list_re.match(l.strip()) if match: x += [match.groupdict()] r = x return r
[docs] def get_cli_stream(self): if self.parent: return self.root.get_cli_stream() if not self.cli_stream: protocol = self.credentials.get("cli_protocol", "telnet") self.logger.debug("Open %s CLI", protocol) self.cli_stream = get_handler( self.cli_protocols[protocol] )(self, tos=self.tos) # Run session setup if self.profile.setup_session: self.logger.debug("Setup session") self.profile.setup_session(self) self.to_shutdown_session = bool(self.profile.shutdown_session) # Disable pager when nesessary if self.to_disable_pager: self.logger.debug("Disable paging") self.to_disable_pager = False if isinstance(self.profile.command_disable_pager, basestring): self.cli(self.profile.command_disable_pager, ignore_errors=True) elif isinstance(self.profile.command_disable_pager, list): for cmd in self.profile.command_disable_pager: self.cli(cmd, ignore_errors=True) else: raise self.UnexpectedResultError return self.cli_stream
[docs] def close_cli_stream(self): if self.parent: return if self.cli_stream: if self.to_shutdown_session: self.logger.debug("Shutdown session") self.profile.shutdown_session(self) self.cli_stream.close() self.cli_stream = None
[docs] def has_snmp(self): """ Check whether equipment has SNMP enabled :return: Check result :rtype: bool """ return bool(self.credentials.get("snmp_ro"))
[docs] def has_snmp_v1(self): """ Check whether equipment supports SNMP v1 :return: Check result :rtype: bool """ return self.has_capability("SNMP | v1")
[docs] def has_snmp_v2c(self): """ Check whether equipment supports SNMP v2c :return: Check result :rtype: bool """ return self.has_capability("SNMP | v2c")
[docs] def has_snmp_v3(self): """ Check whether equipment supports SNMP v3 :return: Check result :rtype: bool """ return self.has_capability("SNMP | v3")
[docs] def has_snmp_bulk(self): """ Check whether equipment supports SNMP BULK :return: Check result :rtype: bool """ return self.has_capability("SNMP | Bulk")
[docs] def has_capability(self, capability): """ Shech whether equipment supports capability :param str capability: Capability name :return: Check result :rtype: bool """ return bool(self.capabilities.get(capability))
[docs] def ignored_exceptions(self, iterable): """ Context manager to silently ignore specified exceptions :param: Iterable object :return: :rtype: object """ return IgnoredExceptionsContextManager(iterable)
[docs]class ScriptsHub(object): """ Object representing Script.scripts structure. Returns initialized child script which can be used ans callable """ class _CallWrapper(object): def __init__(self, script_class, parent): self.parent = parent self.script_class = script_class def __call__(self, **kwargs): return self.script_class( parent=self.parent, service=self.parent.service, args=kwargs, credentials=self.parent.credentials, capabilities=self.parent.capabilities, version=self.parent.version, timeout=self.parent.timeout ).run() def __init__(self, script): self._script = script def __getattr__(self, item): if item.startswith("_"): return self.__dict__[item] else: from loader import loader as script_loader sc = script_loader.get_script( "%s.%s" % (self._script.profile.name, item) ) if sc: return self._CallWrapper(sc, self._script) else: raise AttributeError(item) def __contains__(self, item): """ Check object has script name :param item: First param :type item: object :return: :rtype: bool """ from loader import loader as script_loader if "." not in item: # Normalize to full name item = "%s.%s" % (self._script.profile.name, item) return script_loader.has_script(item)