On Friday August 14th. from 9 p.m. to 9:30 p.m., we will upgrade GitLab software and its cloud platform release. Service may not be available during this maintenance window. Please schedule your work accordingly.

...
 
Commits (2)
*.pyc
*.class
extra/
env/
config/*
nbproject/
.vscode/
.DS_Store
.cache
*.key
*.ini
*.pem
#!/usr/bin/env python3
#!env/bin/python3
# -*- coding: utf-8 -*-
__author__ = 'UIT ET IEDO - Stanford University <uit-et-eit-iedo-staff@office365stanford.onmicrosoft.com>'
__license__ = 'Apache License 2.0'
__author__ = "UIT ET IEDO - Stanford University <uit-et-eit-iedo-staff@office365stanford.onmicrosoft.com>"
__license__ = "Apache License 2.0"
"""
suafs2gdrive
~~~~~~~~
......@@ -10,60 +10,153 @@ __license__ = 'Apache License 2.0'
"""
import sys, os, argparse, platform, subprocess, re
import logging
logging.basicConfig(stream=sys.stdout, level=os.environ.get("LOGLEVEL", "INFO"), format='%(levelname)s %(message)s')
logging.basicConfig(
stream=sys.stdout,
level=os.environ.get("LOGLEVEL", "INFO"),
format="%(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
class SuAFS2gDrive(object):
AFS_USER_HOME=None
AFS_HOME_OWNER=None
OS=None
HOMES_PATH = {'Darwin': '/afs/ir/users/'}
SUNET=None
class SuDatastore(object):
SUNET = None
HOME = None
OS = None
OS_SPECIFICS = {
"Darwin": {
"HOME_PATH": "/afs/ir/users/",
"GDRIVE_INSTALL_STEPS": 'Run "brew install gdrive" in your shell',
"DEFAULT_BROWSER_CMD": "open",
}
}
def __init__(self, sunet=None):
self.OS = platform.system()
self.SUNET = sunet or os.environ.get("USER")
self.HOME = os.environ.get("HOME")
class SuAFS(SuDatastore):
AFS_USER_HOME = None
AFS_HOME_OWNER = None
def __init__(self, sunet=None, afs_user_home=None):
self.OS= platform.system()
self.SUNET= sunet or os.environ.get("USER")
super().__init__(sunet)
if afs_user_home:
self.AFS_USER_HOME = afs_user_home
else:
# import IPython; IPython.embed()
self.AFS_USER_HOME = os.path.join(
self.HOMES_PATH.get(self.OS),
self.OS_SPECIFICS.get(self.OS).get("HOME_PATH"),
self.SUNET[0],
self.SUNET[1],
self.SUNET)
if subprocess.call(['klist', '-t'])!=0:
raise Exception('User {0} has no valid kerberos principal, obtain it with "kinit {0}"'.format(self.SUNET))
if subprocess.call(['aklog'])!=0:
self.SUNET,
)
if subprocess.call(["klist", "-t"]) != 0:
raise Exception(
'User {0} has no valid kerberos principal, obtain it with "kinit {0}"'.format(
self.SUNET
)
)
if subprocess.call(["aklog"]) != 0:
raise Exception('aklog was not successful for user {0}"'.format(self.SUNET))
if not os.path.isdir(self.AFS_USER_HOME):
raise Exception('{} is not mounted, or path is wrong for user {}\'s home'.format(self.AFS_USER_HOME, self.SUNET))
raise Exception(
"{} is not mounted, or path is wrong for user {}'s home".format(
self.AFS_USER_HOME, self.SUNET
)
)
os.chdir(self.AFS_USER_HOME)
acl_output = subprocess.check_output(['fs','la'])
logger.debug('fs la => {}'.format(acl_output))
non_system_matches = re.findall(rb'^\s+(?!.*system:).*$', acl_output, re.MULTILINE)
owner_match = re.search(rb'^\s+(?!.*system:).*rlidwka.*$', acl_output, re.MULTILINE).group()
acl_output = subprocess.check_output(["fs", "la"])
logger.debug("fs la => {}".format(acl_output))
non_system_matches = re.findall(
rb"^\s+(?!.*system:).*$", acl_output, re.MULTILINE
)
owner_match = re.search(
rb"^\s+(?!.*system:).*rlidwka.*$", acl_output, re.MULTILINE
).group()
owner_detected = owner_match.split()[0]
users_detected = [ x.split()[0] for x in non_system_matches]
users_detected = [x.split()[0] for x in non_system_matches]
self.AFS_HOME_OWNER = owner_detected
if not re.search(rb'^Callers access to \. is rlidwka$', subprocess.check_output(['fs','getcalleraccess'])) \
and self.AFS_HOME_OWNER != self.SUNET:
raise Exception('Principal {0} does not seem to be the rightful owner of {1}. User {2} was detected as owner instead"'.format(
self.SUNET,
self.AFS_USER_HOME,
self.AFS_HOME_OWNER))
if (
not re.search(
rb"^Callers access to \. is rlidwka$",
subprocess.check_output(["fs", "getcalleraccess"]),
)
and self.AFS_HOME_OWNER != self.SUNET
):
raise Exception(
'Principal {0} does not seem to be the rightful owner of {1}. User {2} was detected as owner instead"'.format(
self.SUNET, self.AFS_USER_HOME, self.AFS_HOME_OWNER
)
)
class SuGDrive(SuDatastore):
def __init__(self, sunet=None, dest_home_path="imported_AFS"):
super().__init__(sunet)
if subprocess.call(["which", "gdrive"]) != 0:
raise Exception(
'Gdrive CLI app not installed, please install it. {0}"'.format(
self.OS_SPECIFICS.get(self.OS).get("GDRIVE_INSTALL_STEPS")
)
)
if not os.path.exists(os.path.join(self.HOME + ".gdrive/token_v2.json")):
try:
subprocess.check_output(
"gdrive list", input=b"\n", stderr=subprocess.STDOUT, shell=True
)
except subprocess.CalledProcessError as e:
print(
'After you have copied the token from the browser window that just displayed, run "gdrive list" from your command line and paste it when asked'
)
auth_url = re.search(
rb"Go to the following url in your browser:\n(.*)$",
e.output,
re.MULTILINE,
).groups()[0]
auth_url = (
auth_url.decode("utf-8")
+ "&login_hint="
+ self.SUNET
+ "@stanford.edu"
)
logger.debug("auth url => {}".format(auth_url))
os.system(
self.OS_SPECIFICS.get(self.OS).get("DEFAULT_BROWSER_CMD")
+ " '{}'".format(auth_url)
)
# subprocess.check_output('gdrive about')
if __name__ == '__main__':
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--interactive', action='store_true',
help="don't return, go into Ipython")
parser.add_argument('--sunet', action='store', dest='sunet',
help='sunet id (default is local username "{}"'.format(os.environ.get("USER")))
parser.add_argument('-c', '--config-dir', action='store', dest='config_dir',
help='Find network segment by host (IP or hostname)')
parser.add_argument(
"-i", "--interactive", action="store_true", help="don't return, go into Ipython"
)
parser.add_argument(
"--sunet",
action="store",
dest="sunet",
help='sunet id (default is local username "{}"'.format(os.environ.get("USER")),
)
parser.add_argument(
"-c",
"--config-dir",
action="store",
dest="config_dir",
help="Find network segment by host (IP or hostname)",
)
args = parser.parse_args()
obj= SuAFS2gDrive(sunet=args.sunet)
print(obj.AFS_USER_HOME)
afs = SuAFS(sunet=args.sunet)
gdrive = SuGDrive(sunet=args.sunet)
logger.debug("Detected AFS attributes: {}".format(afs.__dict__))
logger.debug("Detected gDrive attributes: {}".format(gdrive.__dict__))
if args.interactive:
import IPython; IPython.embed() ##############################################################
\ No newline at end of file
import IPython
IPython.embed() ##############################################################