# Copyright 2016 # The Board of Trustees of the Leland Stanford Junior University # Stanford Research Computing Center # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Stanford University Workgroup Python bindings Module to manage workgroups through the Restful Web Service API manager = SUWorkgroupManager("https://workgroupsvc.stanford.edu/v1", CERT, CERTKEY) workgroup = manager.workgroup(name) workgroup = manager.create_workgroup(name) manager.delete_workgroup(name) READ ONLY GETTERS ----------------- workgroup.members -> list of sunetid string workgroup.applications -> list of application id workgroup.workgroups -> list of workgroup objects workgroup.administrators -> list of sunetid strings workgroup.admin_apps -> list of admin app id workgroup.admin_workgroups -> list of workgroup objects READ/WRITE PROPERTIES -----------=======---- workgroup.description (string) workgroup.filter (string) workgroup.reusable (boolean) workgroup.visibility (string) workgroup.privgroup (boolean) EDIT MEMBERS ------------ workgroup.add_member('sthiell') workgroup.add_workgroup(child_workgroup) workgroup.remove_member('sthiell') workgroup.remove_workgroup(child_workgroup) EDIT ADMINS ----------- workgroup.add_administrator('sthiell') workgroup.add_admin_workgroup(admin_workgroup) workgroup.remove_administrator('sthiell') workgroup.remove_admin_workgroup(admin_workgroup) """ __author__ = 'sthiell@stanford.edu (Stephane Thiell)' import os, sys, requests, logging from textwrap import dedent import xml.etree.ElementTree as ET from xml.parsers.expat import ExpatError import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) logging.basicConfig( stream=sys.stdout, level=os.environ.get("LOGLEVEL", "INFO"), format="%(levelname)s %(message)s", ) logger = logging.getLogger(__name__) DEFAULT_BASE_URL = "https://workgroupsvc.stanford.edu/v1" # Workgroup filter property FILTER_NONE = 'NONE' FILTER_ACADEMIC_ADMINISTRATIVE = 'ACADEMIC_ADMINISTRATIVE' FILTER_STUDENT = 'STUDENT' FILTER_FACULTY = 'FACULTY' FILTER_STAFF = 'STAFF' FILTER_FACULTY_STAFF = 'FACULTY_STAFF' FILTER_FACULTY_STUDENT = 'FACULTY_STUDENT' FILTER_STAFF_STUDENT = 'STAFF_STUDENT' FILTER_FACULTY_STAFF_STUDENT = 'FACULTY_STAFF_STUDENT' # Workgroup visibility property VISIBILITY_STANFORD = 'STANFORD' VISIBILITY_PRIVATE = 'PRIVATE' class SUWorkgroupAPIError(Exception): """SU Workgroup API Error Exception""" def __init__(self, status_code, text): """initialize API error exception""" self.status_code = status_code oneline = '|'.join([line.strip() for line in text.splitlines()]) Exception.__init__(self, oneline) def __str__(self): return "Error %d: %s" % (self.status_code, self.args[0]) class SUWorkgroup(object): """Stanford University Workgroup Provides accessors and workgroup modifiers. Do not instantiate directly: use SUWorkgroupManager.workgroup(). """ def __init__(self, name, manager): """Initializer - called by SUWorkgroupManager""" # workgroup properties self.name = name self._description = '' self._filter = FILTER_NONE self._visibility = VISIBILITY_STANFORD self._reusable = True self._privgroup = False # workgroup members self.members = set() self.applications = set() self.workgroups = {} # workgroup administrators self.administrators = set() self.admin_apps = set() self.admin_workgroups = {} # workgroup manager (SUWorkgroupManager) self.manager = manager @property def description(self): """Workgroup description property (string).""" return self._description @description.setter def description(self, description): """Update workgroup description property (string).""" self.manager._request_update_property(self.name, 'description', description) self._description = description @property def filter(self): """Workgroup filter property (string).""" return self._filter @filter.setter def filter(self, filter): """Update workgroup filter property (string).""" self.manager._request_update_property(self.name, 'filter', filter) self._filter = filter @property def reusable(self): """Workgroup reusable property (boolean).""" return self._reusable @reusable.setter def reusable(self, reusable): """Update workgroup reusable property (boolean).""" if reusable: reusable_str = 'TRUE' else: reusable_str = 'FALSE' self.manager._request_update_property(self.name, 'reusable', reusable_str) self._reusable = bool(reusable) @property def visibility(self): """Workgroup visibility property (string).""" return self._visibility @visibility.setter def visibility(self, visibility): """Update workgroup visibility property (string).""" self.manager._request_update_property(self.name, 'visibility', visibility) self._visibility = visibility @property def privgroup(self): """Workgroup privgroup property (boolean).""" return self._privgroup @privgroup.setter def privgroup(self, privgroup): """Update workgroup privgroup property (boolean).""" if privgroup: privgroup_str = 'TRUE' else: privgroup_str = 'FALSE' self.manager._request_update_property(self.name, 'privgroup', privgroup_str) self._privgroup = bool(privgroup) def add_member(self, member): """Add an user (string) as member of workgroup object.""" self.manager._request('put', self.name, member, nested_workgroup=False, admin=False) self.members.add(member) def remove_member(self, member): """Remove an member user (string) from workgroup object.""" self.manager._request('delete', self.name, member, nested_workgroup=False, admin=False) self.members.remove(member) def add_administrator(self, admin_member): """Add an user (string) as administrator of workgroup object.""" self.manager._request('put', self.name, admin_member, nested_workgroup=False, admin=True) self.administrators.add(admin_member) def remove_administrator(self, admin_member): """Remove an administrator (string) from workgroup object.""" self.manager._request('delete', self.name, admin_member, nested_workgroup=False, admin=True) self.administrators.remove(admin_member) def add_workgroup(self, workgroup): """ Add workgroup as a member of workgroup object (nested workgroup). workgroup should be a valid SUWorkgroup object """ self.manager._request('put', self.name, workgroup.name, nested_workgroup=True, admin=False) self.workgroups[workgroup.name] = workgroup def remove_workgroup(self, workgroup): """ Remove a workgroup member from workgroup object (nested workgroup). workgroup should be a valid SUWorkgroup object """ self.manager._request('delete', self.name, workgroup.name, nested_workgroup=True, admin=False) del self.workgroups[workgroup.name] def add_admin_workgroup(self, workgroup): """ Add workgroup as an admin of workgroup object. workgroup may be a valid SUWorkgroup object or a string. """ self.manager._request('put', self.name, workgroup.name, nested_workgroup=True, admin=True) self.admin_workgroups[workgroup.name] = workgroup def remove_admin_workgroup(self, workgroup): """ Remove admin workgroup from workgroup object. workgroup may be a valid SUWorkgroup object or a string. """ self.manager._request('delete', self.name, workgroup.name, nested_workgroup=True, admin=True) del self.admin_workgroups[workgroup.name] class SUWorkgroupManager(object): """Stanford University Workgroup Manager class Class used to get and operate on workgroups. Starting -------- Create SUWorkgroupManager object with proper base_url and cert info. Getting an existing workgroup ----------------------------- Use the workgroup(name) method to get an SUWorkgroup object, you may then access workgroup data or modify workgroup data using SUWorkgroup methods. Creating a new workgroup ------------------------ Use create_workgroup(name, ...) to create a new workgroup. This method also returns a new SUWorkgroup object that you can use. Deleting a workgroup -------------------- Use delete_workgroup(name) to delete a workgroup. At the time of writing, it is not possible to reuse a deleted workgroup. Deleting cached workgroups -------------------------- This class currently caches workgroups. use clear() to clear caches. """ def __init__(self, base_url=DEFAULT_BASE_URL, cert=None, cert_key=None): """Initializer, you probably want to pass valid cert/cert_key info.""" self.base_url = base_url self.cert = cert self.cert_key = cert_key self.workgroups = {} # cached workgroups def clear(self): """Clear cached workgroups.""" self.workgroups = {} def _request_workgroup(self, workgroup_name, depth=0): """load""" if workgroup_name in self.workgroups: # cache hit return self.workgroups[workgroup_name] res = requests.get("%s/workgroups/%s" % (self.base_url, workgroup_name), verify=False, cert=(self.cert, self.cert_key)) self._check_response(res) # 200 OK root = ET.fromstring(res.text) workgroup = SUWorkgroup(workgroup_name, self) self.workgroups[workgroup_name] = workgroup for child in root: if child.tag == "description": workgroup._description = child.text elif child.tag == "filter": workgroup._filter = child.text or FILTER_NONE elif child.tag == "visibility": workgroup._visibility = child.text elif child.tag == "reusable": workgroup._reusable = (child.text == 'TRUE') elif child.tag == "privgroup": workgroup._privgroup = (child.text == 'TRUE') elif child.tag == "members": for member in child: if member.tag == "workgroup": child_name = member.attrib['id'] child_workgroup = self._request_workgroup(child_name, depth + 1) assert child_name not in workgroup.workgroups workgroup.workgroups[child_name] = child_workgroup elif member.tag == "member": workgroup.members.add(member.attrib['id']) elif member.tag == "application": workgroup.applications.add(member.attrib['id']) elif child.tag == "administrators": for member in child: if member.tag == "workgroup": child_name = member.attrib['id'] child_workgroup = self._request_workgroup(child_name, depth + 1) assert child_name not in workgroup.admin_workgroups workgroup.admin_workgroups[child_name] = child_workgroup elif member.tag == "member": workgroup.administrators.add(member.attrib['id']) elif member.tag == "application": workgroup.admin_apps.add(member.attrib['id']) return workgroup def _request(self, reqtype, workgroup_name, member, nested_workgroup=False, admin=False): """ Generic Request for Workgroup operation (put, delete) reqtype is the request type like put, delete (match requests.xx calls) workgroup_name is the name of workgroup we operate on member is the name of the new member nested_workgroup: whether the member is a workgroup admin: whether to add new member as an admin of workgroup """ if nested_workgroup: member_objtype = 'workgroups' else: member_objtype = 'users' if admin: section = 'administrators' query_keyword = 'administrator' else: section = 'members' query_keyword = 'user' payload = {query_keyword: '%s/%s/%s' % (self.base_url, member_objtype, member)} reqfunc = getattr(requests, reqtype) res = reqfunc("%s/workgroups/%s/%s" % (self.base_url, workgroup_name, section), verify=False, cert=(self.cert, self.cert_key), params=payload) self._check_response(res) def _request_update_property(self, workgroup_name, property, value): """ Generic Request for property change on a workgroup (description, filter, etc.) workgroup_name is the name of workgroup we operate on property is the name of the workgroup property to update value is the new workgroup property value """ headers = {'Content-Type': 'text/plain;charset=UTF-8'} res = requests.put("%s/workgroups/%s/%s" % (self.base_url, workgroup_name, property), verify=False, cert=(self.cert, self.cert_key), data=value, headers=headers) self._check_response(res) def _check_response(self, response): """Check HTTP response""" logger.debug("Response: {}".format(response.text)) code = response.status_code errmsg = '' try: if len(response.text) > 0: root = ET.fromstring(response.text) if root.tag == "error": for child in root: if child.tag == "message": errmsg = child.text except ExpatError: pass if code >= 300 or errmsg: raise SUWorkgroupAPIError(code, errmsg) def workgroup(self, name): """Get workgroup object from given workgroup name.""" assert name if name not in self.workgroups: return self._request_workgroup(name) return self.workgroups[name] def create_workgroup(self, name, description, filter=FILTER_NONE, visibility=VISIBILITY_STANFORD, reusable=True, privgroup=True): """Create new workgroup object with given name.""" assert name assert description payloadfmt = dedent(""" <workgroup> <description>%s</description> <filter>%s</filter> <visibility>%s</visibility> <reusable>%s</reusable> <privgroup>%s</privgroup> </workgroup>""") payload = payloadfmt % (description, filter, visibility, str(reusable).upper(), str(privgroup).upper()) headers = {'Content-Type': 'text/xml;charset=UTF-8'} res = requests.post("%s/workgroups/%s" % (self.base_url, name), verify=False, cert=(self.cert, self.cert_key), data=payload, headers=headers) if res.status_code not in (requests.codes.ok, requests.codes.created): raise SUWorkgroupAPIError(res.status_code, res.text) return self.workgroup(name) def delete_workgroup(self, name): """Delete workgroup object of given name.""" assert name res = requests.delete("%s/workgroups/%s" % (self.base_url, name), verify=False, cert=(self.cert, self.cert_key)) if res.status_code != requests.codes.ok: raise SUWorkgroupAPIError(res.status_code, res.text)