"""Helper module for IPAM WAPI scripts written in Python 3 Module parameters can be automatically read from environment variables and/or from a config file like this:: IPAM_WAPI https://ipam.illinois.edu/wapi/v2.7.3 IPAM_USERNAME janedoe IPAM_PASSWORD correcthorsebatterystaple IPAM_USE_GPG_AGENT 0 You can also supply these parameters to the constructor (e.g. after reading the values from somewhere else). Example:: import ipam s = ipam.Session(debug_enable=1) #s = ipam.Session(IPAM_CONFIG_FILE="/path/to/config") WAPI = s.WAPI existing_hosts_for_fqdn = s.get(WAPI+'/record:host', params={ 'name:': fqdn }).json() See also: * ``add_host_ip.py`` (example script which uses this module) * `requests `_ """ # Author: dmrz __version__ = "1.0" import requests import os import sys import configparser import itertools import urllib.parse import getpass import re import subprocess from distutils.util import strtobool class Session(requests.Session): """A requests.Session with helpful customizations for making IPAM WAPI requests.""" @property def WAPI(self): """WAPI base URL, for use in constructing requests""" return self._cfg['IPAM_WAPI'] def __init__(self, debug_enable=0, **kwargs): """Set debug_enable=1 or greater to print requests and responses. IPAM_* kwargs override values read from environment or config file. IPAM_CONFIG_FILE overrides location of config file. """ self._cfg = { # Location of config file (defaults to ~/.ipam_config) "IPAM_CONFIG_FILE": kwargs.get('IPAM_CONFIG_FILE', os.environ.get('IPAM_CONFIG_FILE', os.path.join(os.environ.get('HOME', ""), '.ipam_config'))), # WAPI base URL including version "IPAM_WAPI": None, # IPAM credentials (if unset, will prompt user) "IPAM_USERNAME": None, "IPAM_PASSWORD": None, # If set to 1 (or any other true value), then instead of # prompting for password in tty, we'll try to request it from a # running gpg-agent (using gpg-connect-agent, which must be in # your path). This provides a way to cache your password # across multiple script invocations without storing it in a # file or environment variable. Safety not guaranteed; use at # your own risk. "IPAM_USE_GPG_AGENT": 0, } # read config file, overriding defaults if self._cfg['IPAM_CONFIG_FILE'] is not None \ and os.access(self._cfg['IPAM_CONFIG_FILE'], os.R_OK): with open(self._cfg['IPAM_CONFIG_FILE']) as f: # fake a dummy section header f = itertools.chain(("[root]",), f) # whitespace delimited cfp = configparser.ConfigParser(delimiters=(' ')) # preserve case cfp.optionxform = lambda k: k cfp.read_file(f) for k, v in cfp['root'].items(): self._cfg[k] = v # environment variables and kwargs supersede config file for k in self._cfg: if k in os.environ: self._cfg[k] = os.getenv(k) if k in kwargs: self._cfg[k] = kwargs.pop(k) # note IPAM_* kwargs were removed above super().__init__(**kwargs) if debug_enable >= 1: def debug_hook_request(r, *args, **kwargs): print("DEBUG REQUEST: {} {}".format(r.request.method, r.request.url), file=sys.stderr) if r.request.body: print(r.request.body.decode('utf-8'), file=sys.stderr) self.hooks['response'].append(debug_hook_request) # Automatically raise an exception when a WAPI request fails def raise_for_failure(r, *args, **kwargs): if not r.ok: raise requests.HTTPError( "WAPI {} request for '{}' failed with {} '{}':\n{}" .format(r.request.method, r.request.url, r.status_code, r.reason, r.text)) self.hooks['response'].append(raise_for_failure) if debug_enable >= 1: def debug_hook_response(r, *args, **kwargs): print("DEBUG RESPONSE: {}".format(r.text), file=sys.stderr) self.hooks['response'].append(debug_hook_response) if not self._cfg['IPAM_WAPI']: raise Exception("IPAM_WAPI is not set (in environment or config file)") username = self._cfg['IPAM_USERNAME'] password = self._cfg['IPAM_PASSWORD'] netloc = urllib.parse.urlparse(self.WAPI).netloc # prompt user for unknown username and/or password while not username: username = input('Enter username for {}: '.format(netloc)) # don't use preconfigured password without preconfigured username password = None while not password: # note IPAM_USE_GPG_AGENT may be int or string (0 vs "0") if strtobool(str(self._cfg['IPAM_USE_GPG_AGENT'])): # Try to ask gpg-agent for a password. See # https://www.gnupg.org/documentation/manuals/gnupg/Agent-GET_005fPASSPHRASE.html if os.environ.get('GPG_AGENT_INFO'): principal = username+'@'+netloc principal = re.sub(r'[^\w\-\.:@]', '', principal) # remove any undesirable characters gpgagentcmd = "get_passphrase IPAM:{0} X X {0}".format(principal) pwhex = subprocess.check_output( "gpg-connect-agent '{}' '/bye'".format(gpgagentcmd), shell=True ).decode('utf-8') match = re.match(r'OK (.*)', pwhex) if match: password = bytes.fromhex(match.group(1)).decode('utf-8') else: print("GPG_AGENT_INFO not found in environment; try running `gpg-agent --daemon` first?", file=sys.stderr) if not password: password = getpass.getpass(prompt='Enter password for {} on {}: '.format(username, netloc)) # Provide credentials. Note that requests.Session gives us cookie # persistence for free, meaning we will automatically use the ibapauth # cookie for subsequent requests in the same session. self.auth = (username, password)