# -*- coding: utf-8 -*-
##----------------------------------------------------------------------
## SA Script base
##----------------------------------------------------------------------
## Copyright (C) 2007-2016 The NOC Project
## See LICENSE for details
##----------------------------------------------------------------------
## Python modules
import re
import logging
import time
import itertools
import operator
## NOC modules
from snmp.base import SNMP
from snmp.beef import BeefSNMP
from http.base import HTTP
from noc.lib.log import PrefixLoggerAdapter
from noc.lib.validators import is_int
from context import (ConfigurationContextManager, CacheContextManager,
IgnoredExceptionsContextManager)
from noc.core.profile.loader import loader as profile_loader
from noc.core.handler import get_handler
from noc.lib.mac import MAC
from beef import Beef
[docs]class BaseScript(object):
"""
Service Activation script base class
"""
class __metaclass__(type):
"""
Process @match decorators
"""
def __new__(mcs, name, bases, attrs):
n = type.__new__(mcs, name, bases, attrs)
n._execute_chain = sorted(
(v for v in attrs.itervalues() if hasattr(v, "_seq")),
key=operator.attrgetter("_seq")
)
return n
name = None
"""
Script name in form of <vendor>.<system>.<name>
"""
TIMEOUT = 120
"""
Default script timeout
"""
cache = False
"""
Enable call cache
If True, script result will be cached and reused
during lifetime of parent script
"""
interface = None
"""
Implemented interface
"""
requires = []
"""
Scripts required by generic script.
For common scripts - empty list
For generics - list of pairs (script_name, interface)
"""
base_logger = logging.getLogger(name or "script")
_x_seq = itertools.count()
# Common errors
[docs] class ScriptError(Exception):
"""
Script error
:raise: ScriptError
"""
# LoginError = LoginError
[docs] class CLISyntaxError(ScriptError):
"""
Syntax error
:raise: CLISyntaxError
"""
[docs] class CLIOperationError(ScriptError):
"""
Operational CLI error
:raise: CLIOperationError
"""
# CLITransportError = CLITransportError
# CLIDisconnectedError = CLIDisconnectedError
# TimeOutError = TimeOutError
[docs] class NotSupportedError(ScriptError):
"""
Feature is not supported
:raise: NotSupportedError
"""
[docs] class UnexpectedResultError(ScriptError):
"""
Unexpected result
:raise: UnexpectedResultError
"""
hexbin = {
"0": "0000", "1": "0001", "2": "0010", "3": "0011",
"4": "0100", "5": "0101", "6": "0110", "7": "0111",
"8": "1000", "9": "1001", "a": "1010", "b": "1011",
"c": "1100", "d": "1101", "e": "1110", "f": "1111"
}
cli_protocols = {
"telnet": "noc.core.script.cli.telnet.TelnetCLI",
"ssh": "noc.core.script.cli.ssh.SSHCLI",
"beef": "noc.core.script.cli.beef.BeefCLI"
}
def __init__(self, service, credentials,
args=None, capabilities=None,
version=None, parent=None, timeout=None,
name=None, collect_beef=False):
self.service = service
self.tos = self.service.config.tos
self.pool = self.service.config.pool
self.parent = parent
self._motd = None
name = name or self.name
self.logger = PrefixLoggerAdapter(
self.base_logger,
"%s] [%s" % (self.name, credentials.get("address", "-"))
)
if self.parent:
self.profile = self.parent.profile
else:
self.profile = profile_loader.get_profile(
".".join(name.split(".")[:2])
)()
self.credentials = credentials or {}
self.version = version or {}
self.capabilities = capabilities
self.timeout = timeout or self.get_timeout()
self.start_time = None
self.args = self.clean_input(args or {})
self.cli_stream = None
if collect_beef:
self.beef = Beef(script=self.name)
self.logger.info("Collecting beef %s", self.beef.uuid)
else:
self.beef = None
if self.parent:
self.snmp = self.root.snmp
self.beef = self.parent.beef
else:
if self.credentials.get("beef"):
self.snmp = BeefSNMP(self)
else:
self.snmp = SNMP(self, beef=self.beef)
self.http = HTTP(self)
self.to_disable_pager = not self.parent and self.profile.command_disable_pager
self.to_shutdown_session = False
self.scripts = ScriptsHub(self)
self.is_cached = False # Cache CLI and SNMP calls, if set
# Suitable only when self.parent is None.
# Cached results for scripts marked with "cache"
self.call_cache = {}
#
if not parent and version and not name.endswith(".get_version"):
self.logger.debug("Filling get_version cache with %s",
version)
s = name.split(".")
self.set_cache(
"%s.%s.get_version" % (s[0], s[1]),
{},
version
)
#
if self.profile.setup_script:
self.profile.setup_script(self)
def __call__(self, *args, **kwargs):
self.args = kwargs
return self.run()
[docs] def clean_output(self, result):
"""
Clean script result against interface
:param str result: Output from device
:return: Cleaned output
:rtype: str
"""
return self.interface().script_clean_result(self.profile, result)
[docs] def run(self):
"""
Run script
:return: Result return execute() of method
"""
self.start_time = time.time()
self.logger.info("Running. Input arguments: %s, timeout %s",
self.args, self.timeout)
# Use cached result when available
cache_hit = False
if self.cache and self.parent:
try:
result = self.get_cache(self.name, self.args)
self.logger.info("Using cached result")
cache_hit = True
except KeyError:
pass
# Execute script
if not cache_hit:
try:
result = self.execute(**self.args)
if self.cache and self.parent and result:
self.logger.info("Caching result")
self.set_cache(self.name, self.args, result)
finally:
if not self.parent:
# Close SNMP socket when necessary
self.snmp.close()
# Close CLI socket when necessary
self.close_cli_stream()
# Close HTTP Client
self.http.close()
# Clean result
result = self.clean_output(result)
self.logger.debug("Result: %s", result)
runtime = time.time() - self.start_time
self.logger.info("Complete (%.2fms)", runtime * 1000)
return result
@classmethod
[docs] def compile_match_filter(cls, *args, **kwargs):
"""
Compile arguments into version check function
:return: Returns callable accepting self and version hash arguments
"""
c = [lambda self, x, g=f: g(x) for f in args]
for k, v in kwargs.items():
# Split to field name and lookup operator
if "__" in k:
f, o = k.split("__")
else:
f = k
o = "exact"
# Check field name
if f not in ("vendor", "platform", "version", "image"):
raise Exception("Invalid field '%s'" % f)
# Compile lookup functions
if o == "exact":
c += [lambda self, x, f=f, v=v: x[f] == v]
elif o == "iexact":
c += [lambda self, x, f=f, v=v: x[f].lower() == v.lower()]
elif o == "startswith":
c += [lambda self, x, f=f, v=v: x[f].startswith(v)]
elif o == "istartswith":
c += [lambda self, x, f=f, v=v: x[f].lower().startswith(v.lower())]
elif o == "endswith":
c += [lambda self, x, f=f, v=v: x[f].endswith(v)]
elif o == "iendswith":
c += [lambda self, x, f=f, v=v: x[f].lower().endswith(v.lower())]
elif o == "contains":
c += [lambda self, x, f=f, v=v: v in x[f]]
elif o == "icontains":
c += [lambda self, x, f=f, v=v: v.lower() in x[f].lower()]
elif o == "in":
c += [lambda self, x, f=f, v=v: x[f] in v]
elif o == "regex":
c += [lambda self, x, f=f, v=re.compile(v): v.search(x[f]) is not None]
elif o == "iregex":
c += [lambda self, x, f=f, v=re.compile(v, re.IGNORECASE): v.search(x[f]) is not None]
elif o == "isempty": # Empty string or null
c += [lambda self, x, f=f, v=v: not x[f] if v else x[f]]
elif f == "version":
if o == "lt": # <
c += [lambda self, x, v=v: self.profile.cmp_version(x["version"], v) < 0]
elif o == "lte": # <=
c += [lambda self, x, v=v: self.profile.cmp_version(x["version"], v) <= 0]
elif o == "gt": # >
c += [lambda self, x, v=v: self.profile.cmp_version(x["version"], v) > 0]
elif o == "gte": # >=
c += [lambda self, x, v=v: self.profile.cmp_version(x["version"], v) >= 0]
else:
raise Exception("Invalid lookup operation: %s" % o)
else:
raise Exception("Invalid lookup operation: %s" % o)
# Combine expressions into single lambda
return reduce(
lambda x, y: lambda self, v, x=x, y=y: (
x(self, v) and y(self, v)
),
c,
lambda self, x: True
)
@classmethod
[docs] def match(cls, *args, **kwargs):
"""
execute method decorator
"""
def wrap(f):
# Append to the execute chain
if hasattr(f, "_match"):
old_filter = f._match
f._match = lambda self, v, old_filter=old_filter, new_filter=new_filter: new_filter(self, v) or old_filter(self, v)
else:
f._match = new_filter
f._seq = cls._x_seq.next()
return f
# Compile check function
new_filter = cls.compile_match_filter(*args, **kwargs)
# Return decorated function
return wrap
[docs] def match_version(self, *args, **kwargs):
"""
inline version for BaseScript.match
"""
if not self.version:
self.version = self.scripts.get_version()
return self.compile_match_filter(*args, **kwargs)(
self,
self.version
)
[docs] def execute(self, **kwargs):
"""
Default script behavior:
Pass through _execute_chain and call appropriative handler
"""
if self._execute_chain and not self.name.endswith(".get_version"):
# Get version information
if not self.version:
self.version = self.scripts.get_version()
# Find and execute proper handler
for f in self._execute_chain:
if f._match(self, self.version):
return f(self, **kwargs)
# Raise error
raise self.NotSupportedError()
[docs] def cleaned_config(self, config):
"""
Clean up config from all unnecessary trash
:param str config: Configuration for clean
:return: Clean up config from all unnecessary trash
:rtype: str
"""
return self.profile.cleaned_config(config)
[docs] def strip_first_lines(self, text, lines=1):
"""
Strip first *lines*
:param str text: Text
:param int lines: Number of lines for strip
:return: Text with stripped first N lines
:rtype: str
"""
t = text.split("\n")
if len(t) <= lines:
return ""
else:
return "\n".join(t[lines:])
[docs] def expand_rangelist(self, s):
"""
Expand expressions like "1,2,5-7" to [1, 2, 5, 6, 7]
:param str s: Comma-separated list
:return: [1, 2, 5, 6, 7]
:rtype: list
"""
result = {}
for x in s.split(","):
x = x.strip()
if x == "":
continue
if "-" in x:
l, r = [int(y) for y in x.split("-")]
if l > r:
x = r
r = l
l = x
for i in range(l, r + 1):
result[i] = None
else:
result[int(x)] = None
return sorted(result.keys())
rx_detect_sep = re.compile("^(.*?)\d+$")
[docs] def expand_interface_range(self, s):
"""
Convert interface range expression to a list
of interfaces
"Gi 1/1-3,Gi 1/7" -> ["Gi 1/1", "Gi 1/2", "Gi 1/3", "Gi 1/7"]
"1:1-3" -> ["1:1", "1:2", "1:3"]
"1:1-1:3" -> ["1:1", "1:2", "1:3"]
:param str s: Comma-separated list
:returns: ["port", ..]
:rtype: list
"""
r = set()
for x in s.split(","):
x = x.strip()
if not x:
continue
if "-" in x:
# Expand range
f, t = [y.strip() for y in x.split("-")]
# Detect common prefix
match = self.rx_detect_sep.match(f)
if not match:
raise ValueError(x)
prefix = match.group(1)
# Detect range boundaries
start = int(f[len(prefix):])
if is_int(t):
stop = int(t) # Just integer
else:
if not t.startswith(prefix):
raise ValueError(x)
stop = int(t[len(prefix):]) # Prefixed
if start > stop:
raise ValueError(x)
for i in range(start, stop + 1):
r.add(prefix + str(i))
else:
r.add(x)
return sorted(r)
[docs] def macs_to_ranges(self, macs):
"""
Converts list of macs to rangea
:param macs: Iterable yielding mac addresses
:returns: [(from, to), ..]
"""
r = []
for m in sorted(MAC(x) for x in macs):
if r:
if r[-1][1].shift(1) == m:
# Expand last range
r[-1][1] = m
else:
r += [[m, m]]
else:
r += [[m, m]]
return [(str(x[0]), str(x[1])) for x in r]
[docs] def hexstring_to_mac(self, s):
"""
Convert a 6-octet string to MAC address
:param str s: 6-octet string
:return: MAC Address
:rtype: MAC
"""
return ":".join(["%02X" % ord(x) for x in s])
@property
def root(self):
"""
Get root script
:return
:rtype: str
"""
if self.parent:
return self.parent.root
else:
return self
[docs] def get_cache(self, key1, key2):
"""
Get cached result or raise KeyError
:param key1 str: Cache key1
:param key2 str: Cache key2
:returns: Cache result
:rtype: object
"""
s = self.root
return s.call_cache[repr(key1)][repr(key2)]
[docs] def set_cache(self, key1, key2, value):
"""
Set cached result
:param key1, key2: Cache key
:param value: Value for write to cache
"""
key1 = repr(key1)
key2 = repr(key2)
s = self.root
if key1 not in s.call_cache:
s.call_cache[key1] = {}
s.call_cache[key1][key2] = value
[docs] def cached(self):
"""
Return cached context managed. All nested CLI and SNMP GET/GETNEXT
calls will be cached.
Usage:
with self.cached():
self.cli(".....)
self.scripts.script()
"""
return CacheContextManager(self)
[docs] def enter_config(self):
"""
Enter configuration mote
:return:
"""
if self.profile.command_enter_config:
self.cli(self.profile.command_enter_config)
[docs] def leave_config(self):
"""
Leave configuration mode
:return:
"""
if self.profile.command_leave_config:
self.cli(self.profile.command_leave_config)
self.cli("") # Guardian empty command to wait until configuration is finally written
[docs] def save_config(self, immediately=False):
"""
Save current config
:return:
"""
if immediately:
if self.profile.command_save_config:
self.cli(self.profile.command_save_config)
else:
self.schedule_to_save()
[docs] def schedule_to_save(self):
self.need_to_save = True
if self.parent:
self.parent.schedule_to_save()
[docs] def set_motd(self, motd):
"""
Set _motd Attrinute - Message of The day
:param str motd:
:return:
"""
self._motd = motd
@property
def motd(self):
"""
Return message of the day
:return: Message of the day
:rtype: str
"""
if self._motd:
return self._motd
else:
return self.get_cli_stream().get_motd()
[docs] def re_search(self, rx, s, flags=0):
"""
Match s against regular expression rx using re.search
Raise UnexpectedResultError if regular expression is not matched.
Returns match object.
rx can be string or compiled regular expression
:return: re match object
:rtype: re.match
"""
if isinstance(rx, basestring):
rx = re.compile(rx, flags)
match = rx.search(s)
if match is None:
raise self.UnexpectedResultError()
return match
[docs] def re_match(self, rx, s, flags=0):
"""
Match s against regular expression rx using re.match
Raise UnexpectedResultError if regular expression is not matched.
Returns match object.
rx can be string or compiled regular expression
:return: re match object
:rtype: object
"""
if isinstance(rx, basestring):
rx = re.compile(rx, flags)
match = rx.match(s)
if match is None:
raise self.UnexpectedResultError()
return match
_match_lines_cache = {}
@classmethod
[docs] def match_lines(cls, rx, s):
k = id(rx)
if k not in cls._match_lines_cache:
_rx = [re.compile(l, re.IGNORECASE) for l in rx]
cls._match_lines_cache[k] = _rx
else:
_rx = cls._match_lines_cache[k]
ctx = {}
idx = 0
r = _rx[0]
for l in s.splitlines():
l = l.strip()
match = r.search(l)
if match:
ctx.update(match.groupdict())
idx += 1
if idx == len(_rx):
return ctx
r = _rx[idx]
return None
[docs] def find_re(self, iter, s):
"""
Find first matching regular expression
or raise Unexpected result error
:param iter: Iterable objecth for search
:param re s:
:return:
:rtype: re.match
"""
for r in iter:
if r.search(s):
return r
raise self.UnexpectedResultError()
[docs] def hex_to_bin(self, s):
"""
Convert hexadecimal string to boolean string.
All non-hexadecimal characters are ignored
:param s: Input string
:return: Boolean string
:rtype: basestring
"""
return "".join(
self.hexbin[c] for c in
"".join("%02x" % ord(d) for d in s)
)
[docs] def push_prompt_pattern(self, pattern):
self.get_cli_stream().push_prompt_pattern(pattern)
[docs] def pop_prompt_pattern(self):
self.get_cli_stream().pop_prompt_pattern()
[docs] def has_oid(self, oid):
"""
Check object responses to oid
:return: Check result
:rtype: bool
"""
try:
return bool(self.snmp.get(oid))
except self.snmp.TimeOutError:
return False
[docs] def get_timeout(self):
"""
:return: TIMEOUT for script
:rtype: int
"""
return self.TIMEOUT
[docs] def cli(self, cmd, command_submit=None, bulk_lines=None,
list_re=None, cached=False, file=None, ignore_errors=False,
nowait=False, obj_parser=None, cmd_next=None, cmd_stop=None):
"""
Execute CLI command and return result. Initiate cli session
when necessary
:param str cmd: CLI command to execute
:param str command_submit: Set in Profile, submit enter command
:param bulk_lines: Not use
:param list_re: Format output
:type: re.MatchObject
:param bool cached: Cache result
:param file file: Read cli from file
:param bool ignore_errors: Ignore syntax error in commands
:param bool nowait: Not use
:return: if list_re is None, return a string
:rtype: str
:return: dict : if list_re is regular expression object, return a list of dicts (group name -> value), one dict per matched line
:rtype: dict
"""
if file:
with open(file) as f:
return f.read()
command_submit = command_submit or self.profile.command_submit
stream = self.get_cli_stream()
r = stream.execute(cmd + command_submit, obj_parser=obj_parser,
cmd_next=cmd_next, cmd_stop=cmd_stop)
if isinstance(r, basestring):
if self.beef:
self.beef.set_cli(cmd, r)
# Check for syntax errors
if not ignore_errors:
# Check for syntax error
if (self.profile.rx_pattern_syntax_error and
self.profile.rx_pattern_syntax_error.search(r)):
raise self.CLISyntaxError(r)
# Then check for operation error
if (self.profile.rx_pattern_operation_error and
self.profile.rx_pattern_operation_error.search(r)):
raise self.CLIOperationError(r)
# Echo cancelation
if r[:4096].lstrip().startswith(cmd):
r = r.lstrip()
if r.startswith(cmd + "\n"):
# Remove first line
r = self.strip_first_lines(r.lstrip())
else:
# Some switches, like ProCurve do not send \n after the echo
r = r[len(cmd):]
# Convert to list of dicts if list_re is defined
if list_re:
x = []
for l in r.splitlines():
match = list_re.match(l.strip())
if match:
x += [match.groupdict()]
r = x
return r
[docs] def get_cli_stream(self):
if self.parent:
return self.root.get_cli_stream()
if not self.cli_stream:
protocol = self.credentials.get("cli_protocol", "telnet")
self.logger.debug("Open %s CLI", protocol)
self.cli_stream = get_handler(
self.cli_protocols[protocol]
)(self, tos=self.tos)
# Run session setup
if self.profile.setup_session:
self.logger.debug("Setup session")
self.profile.setup_session(self)
self.to_shutdown_session = bool(self.profile.shutdown_session)
# Disable pager when nesessary
if self.to_disable_pager:
self.logger.debug("Disable paging")
self.to_disable_pager = False
if isinstance(self.profile.command_disable_pager, basestring):
self.cli(self.profile.command_disable_pager,
ignore_errors=True)
elif isinstance(self.profile.command_disable_pager, list):
for cmd in self.profile.command_disable_pager:
self.cli(cmd, ignore_errors=True)
else:
raise self.UnexpectedResultError
return self.cli_stream
[docs] def close_cli_stream(self):
if self.parent:
return
if self.cli_stream:
if self.to_shutdown_session:
self.logger.debug("Shutdown session")
self.profile.shutdown_session(self)
self.cli_stream.close()
self.cli_stream = None
[docs] def has_snmp(self):
"""
Check whether equipment has SNMP enabled
:return: Check result
:rtype: bool
"""
return bool(self.credentials.get("snmp_ro"))
[docs] def has_snmp_v1(self):
"""
Check whether equipment supports SNMP v1
:return: Check result
:rtype: bool
"""
return self.has_capability("SNMP | v1")
[docs] def has_snmp_v2c(self):
"""
Check whether equipment supports SNMP v2c
:return: Check result
:rtype: bool
"""
return self.has_capability("SNMP | v2c")
[docs] def has_snmp_v3(self):
"""
Check whether equipment supports SNMP v3
:return: Check result
:rtype: bool
"""
return self.has_capability("SNMP | v3")
[docs] def has_snmp_bulk(self):
"""
Check whether equipment supports SNMP BULK
:return: Check result
:rtype: bool
"""
return self.has_capability("SNMP | Bulk")
[docs] def has_capability(self, capability):
"""
Shech whether equipment supports capability
:param str capability: Capability name
:return: Check result
:rtype: bool
"""
return bool(self.capabilities.get(capability))
[docs] def ignored_exceptions(self, iterable):
"""
Context manager to silently ignore specified exceptions
:param: Iterable object
:return:
:rtype: object
"""
return IgnoredExceptionsContextManager(iterable)
[docs]class ScriptsHub(object):
"""
Object representing Script.scripts structure.
Returns initialized child script which can be used ans callable
"""
class _CallWrapper(object):
def __init__(self, script_class, parent):
self.parent = parent
self.script_class = script_class
def __call__(self, **kwargs):
return self.script_class(
parent=self.parent,
service=self.parent.service,
args=kwargs,
credentials=self.parent.credentials,
capabilities=self.parent.capabilities,
version=self.parent.version,
timeout=self.parent.timeout
).run()
def __init__(self, script):
self._script = script
def __getattr__(self, item):
if item.startswith("_"):
return self.__dict__[item]
else:
from loader import loader as script_loader
sc = script_loader.get_script(
"%s.%s" % (self._script.profile.name, item)
)
if sc:
return self._CallWrapper(sc, self._script)
else:
raise AttributeError(item)
def __contains__(self, item):
"""
Check object has script name
:param item: First param
:type item: object
:return:
:rtype: bool
"""
from loader import loader as script_loader
if "." not in item:
# Normalize to full name
item = "%s.%s" % (self._script.profile.name, item)
return script_loader.has_script(item)