Friday, November 13, 2015

mount and umount with ctypes

Automating a linux build I came across a phase where we had to do mount. A simple way was to perform a mount command using the subprocess module as shown in the code snippet below

#------------------------
import subprocess

mountSource =  "/home/addy/workspace/pymount/test1"
mountTarget =   "/home/addy/workspace/pymount/test2"
fstype = "ext4"

command = "mount -B -t {2} {0} {1}".format(mountSource, mountTarget,fstype) 

print(command.split(" "))
output = subprocess.check_output(command.split(" "))

#-------------------------

or we could have used ctypes. Using python3 the challenge was to make ctypes work. Python3 strings are unicode by default. We need to convert them to ASCII for linux system calls via the ctypes library.

Although this was an easy task once we understand the intrinsics. We should use encode method of the strings to convert them to the ascii strings.

Following code demonstrates the mount from one directory to another.

#-------------------------
import ctypes
import ctypes.util
import os

mountSource =  "/home/addy/workspace/pymount/test3".encode(encoding='ascii', errors='replace')
mountTarget =   "/home/addy/workspace/pymount/test4".encode(encoding='ascii', errors='replace')

libcPath = ctypes.util.find_library("c")
libc = ctypes.CDLL(libcPath)

retCode = libc.mount(mountSource,mountTarget,None,4096,None)
if retCode != 0:
    #errNumber = libc.errno()
    #print ("Error number from libc " + errNumber)
    print ("os.ErrNo = " + str(os.error() ))

print("ls the mountTarget " + str(mountTarget))
output = os.system("ls -al " + mountTarget.decode("utf-8","ignore"))

errorCode = libc.umount(mountTarget,None)
print("Error Code for umount" + str(errorCode))
if errorCode != 0:
    print("Unmount unsuccessful " + str(libc.errno))
else:
    print("successful unmount")

print("unmount")


#------------------------------------------

A Full class implementation although not tested yet 

#----------------------------------------

import ctypes
import ctypes.util
import os
import pwd

class Mounter(object): 
    """
    sys/mount.h constants
    """
    MS_RDONLY = 1       
    MS_NOSUID = 2      
    MS_NODEV = 4          
    MS_NOEXEC = 8      
    MS_SYNCHRONOUS = 16       
    MS_REMOUNT = 32       
    MS_MANDLOCK = 64       
    MS_DIRSYNC = 128       
    MS_NOATIME = 1024       
    MS_NODIRATIME = 2048       
    MS_BIND = 4096       
    MS_MOVE = 8192
    MS_REC = 16384
    MS_SILENT = 32768
    MS_POSIXACL = 1 << 16   
    MS_UNBINDABLE = 1 << 17   
    MS_PRIVATE = 1 << 18     
    MS_SLAVE = 1 << 19       
    MS_SHARED = 1 << 20       
    MS_RELATIME = 1 << 21   
    MS_KERNMOUNT = 1 << 22   
    MS_I_VERSION =  1 << 23   
    MS_STRICTATIME = 1 << 24   
    MS_ACTIVE = 1 << 30
    MS_NOUSER = 1 << 31
   
   
   
    def __init__(self,sourcePath,destPath,fileSystem = None,flags=Mounter.MS_RDONLY):
       
        if fileSystem != None:
            fslist = []
            with open('/proc/filesystems') as filesystems:
                for line in filesystems:
                    fs = line.split("\t")[1]
                    fslist.append(fs)
        if not fileSystem in fslist:
            raise ValueError([fileSystem," doesn't exist in filesystem list ",fslist])
               
        if not os.access(sourcePath,os.R_OK) :
            raise ValueError(["Cannot access " , sourcePath])
        if not os.access(destPath,os.W_OK):
            raise ValueError(["Cannot access " , destPath])
       
        #since we have reached here
        self.fileSystem = fileSystem
        self.libc = ctypes.CDLL(self.get_libc())
        self.mountSource = sourcePath
        self.mountTarget = destPath
        self.flags = flags
       
       
    def get_libc(self):
        libcPath = ctypes.util.find_library("c")
        return libcPath
   
    def is_current_user_root(self):
        pwDBEntry = pwd.getpwuid(os.getuid())
        userName = pwDBEntry.pw_name
        if userName == "root":
            return True
        else:
            return False
           
    def mount_path(self):
        mountPathBytes = self.mountSource.encode("ascii","ignore")
        targetPathBytes = self.mountTarget.encode("ascii","ignore")
        fileSystemBytes = self.fileSystem.encode("ascii","ignore")
       
        errorCode = self.libc.mount(mountPathBytes,
                                    targetPathBytes,
                                    fileSystemBytes,
                                    self.flags,None) #extra data is considered null
       
        if errorCode != 0:
            osErrorCode = ctypes.get_errno()
            osErrorString = os.errno.errorcode[osErrorCode]
            raise OSError([osErrorCode, osErrorString])
       
   
    def unmount_path(self,flags=None):
        targetPathBytes = self.mountTarget.encode("ascii","ignore")
        errorCode = self.libc.umount(targetPathBytes,flags)
        if errorCode != 0:
            osErrorCode = ctypes.get_errno()
            osErrorString = os.errno.errorcode[osErrorCode]
            raise OSError([osErrorCode, osErrorString])

#----------------------------------------------------- 


No comments:

Post a Comment