BackupSynchronizer: a Simple Directory Backup and Synchronization Script in Python
I have been trying to pickup programming in Python. It just seems to me that it’s a charming programming language with a very practical approach to language design, and a standard library very rich both in depth and breadth.
Anyway, as a first exercise I decided to create a directory backup script. Of course, there are tons of such scripts and utilities out there that do the same thing, but this doesn’t make the exercise less interesting in itself. I started with a recipe from ActiveState Python recipes, and modified it quite a bit to the point where I can’t recognize the original code anymore. I rewrote most of it, including a new user interface with more and better options.
Among the notable changes I made are the following:
- I made it compatible with Python 3. The main thing that needed change if I remember correctly, was the calls to
print. In Python 3printis a builtin function rather than a statement as it used to be in Python 2, so it needed to be called as a function, with parentheses and all. To keep it compatible with Python 2.6 as well, I resorted to this ‘magical’ import at the beginning of the script:from __future__ import print_function
- Notice that printing is wrapped in a small routine and within a try/except block. The reason is when I was testing it on the Windows command line, I encountered some files with characters that do not seem to print correctly. Among these characters are
"\u2013"and"\u0174". - Changed command line processing from the
getoptmodule tooptparse. Theoptparsemodule is for sure much more powerful, flexible and useful. It allowed me to declare the options one by one and provide the help text associated with each options. One perk is that it automatically generates the help information and displays it if you run the script on the command line with--helpor-harguments. - Note the code to make a file writable (in
--forcemode) before clobbering or deleting it. It switches between OS types, and handles Windows and Linux systems differently:def _makeWritable(self, path): try: if os.name == 'nt': # on Windows systems cut to the chase and call the attrib command result = os.system('attrib -R "' + path + '"') if result: raise os.error('Failed to make "' + path + '" writable.') else: # assume Linux st = self._stat(path) # find the minimum change required to make it writable flag = stat.S_IWOTH if st.st_uid == os.getuid(): flag = stat.S_IWUSR elif st.st_gid == os.getgid() \ or st.st_gid in os.getgroups(): flag = stat.S_IWGRP os.chmod(path, st.st_mode | flag) except Exception as e: self._error(e)To make a file writable on Windows we resort to the system-provided utility
attrib. While on a Linux system, I implemented a small algorithm to check the file ownership to decide what’s the smallest privilege change necessary to make the file writable. For example, if the file is owned by the current user, we only need to add the user +w flag, leaving group and everyone permissions untouched, and so on. - Cleaned-up most of the conditionals, and relied on run-time method mapping to define the branching logic.
- Provided a more meaningful output that tracks the directory tree as we traverse it. This also comes with a
--diffand--compareactions that simply traverse the tree listing differences and similarities, and--quietand--silentmodes which control the verbosity of the output during the sync operations.
Well, now we get to the actual code. Here’s a listing of the usage of the script:
Usage: backupsync.py[options] Options: -h, --help show this help message and exit -y, --synchronize (default) Synchronize (copy, purge and update) content between sourcedir and targetdir (sourcedir => targetdir). -u, --update update existing content between sourcedir and targetdir -c, --copy only copies missing content to targetdir -o, --compare only compare the sourcedir and targetdir, showing differences and commonalities. -d, --diff only report difference between sourcedir and targetdir -s, --shallow, --non-recursive does not go recursively into subdirectories -n, --dont-purge does not purge files from target when synchronizing. -f, --force forces copy/delete/update of files/directories by trying to change their write permissions. -r, --create-target create target directory if it does not exist. -t, --ctime compare file's creation time as well as its modification time for an update (by default, it only compares moidification time). -x REGEX, --exclude=REGEX exclude files and directories that match the provided regular expression. -q, --quiet print less information while processing. -l, --silent print no information while processing; only error message are printed to stderr.
The full script listing is provided below:
""" backupsync.py -
Adapted from PyRobocopy.py: http://code.activestate.com/recipes/231501/
Compatible with Python 2.6 and Python 3.1
By Robert Mahfoud - http://www.1apple1day.com
01/09/2009: v1.0.0 - First release
01/16/2009: v1.0.0 - Major rewrite
"""
from __future__ import print_function
import optparse
import os, stat
import time
import shutil
import filecmp
class BackupSynchronizer:
""" An advanced directory synchronization intended to create and maintain
backups. """
def __init__(self):
self._sourcedir = ''
self._targetdir = ''
self._verbosity = 2
self._maxdepth = 2^32 - 1
self._force = False
self._maketarget = True
self._excludes = [];
self._comparetimefunc = self._compareModTimeStamp
self._copyfunc = self._copy
self._updatefunc = self._update
self._purgefunc = self._purge
self._printsamefunc = self._reportLocation
self._printdifferentfunc = self._reportLocation
# stat vars
self._numdirs = 0
self._numfiles = 0
self._numdelfiles = 0
self._numdeldirs = 0
self._numnewdirs = 0
self._numupdates = 0
self._starttime = 0.0
self._endtime = 0.0
# failure stat vars
self._copyfld = []
self._updsfld = []
self._dirsfld = []
self._delffld = []
self._deldfld = []
def parse_args(self, args):
parser = optparse.OptionParser(usage="%prog [options]")
parser.add_option('-y', '--synchronize', dest='synchronize'
, action='store_true'
, default=False
, help='(default) Synchronize (copy, purge and \
update) content between sourcedir and targetdir (sourcedir => targetdir).')
parser.add_option('-u', '--update', dest='update'
, action='store_true'
, default=False
, help='update existing content between sourcedir \
and targetdir')
parser.add_option('-c', '--copy', dest='copy'
, action='store_true'
, default=False
, help='only copies missing content to targetdir')
parser.add_option('-o', '--compare', dest='compare'
, action='store_true'
, default=False
, help='only compare the sourcedir and targetdir, \
showing differences and commonalities.')
parser.add_option('-d', '--diff', dest='diff'
, action='store_true'
, default=False
, help='only report difference between sourcedir \
and targetdir')
parser.add_option('-s', '--shallow', '--non-recursive', dest='shallow'
, action='store_true'
, default=False
, help='does not go recursively into subdirectories')
parser.add_option('-n', '--dont-purge', dest='dontpurge'
, action='store_true'
, default=False
, help='does not purge files from target when \
synchronizing.')
parser.add_option('-f', '--force', dest='force'
, action='store_true'
, default=False
, help='forces copy/delete/update of files/\
directories by trying to change their write permissions.')
parser.add_option('-r', '--create-target', dest='createtarget'
, action='store_true'
, default=False
, help='create target directory if it does not \
exist.')
parser.add_option('-t', '--ctime', dest='creationtime'
, action='store_true'
, default=False
, help="compare file's creation time as well as \
its modification time for an update (by default, it only compares \
moidification time).")
parser.add_option('-x', '--exclude', dest='excludes'
, action='append'
, nargs=1
, help='exclude files and directories that match the \
provided regular expression.'
, metavar='REGEX')
parser.add_option('-q', '--quiet', dest='quiet'
, action='store_true'
, help='print less information while processing.')
parser.add_option('-l', '--silent', dest='silent'
, action='store_true'
, help='print no information while processing; \
only error message are printed to stderr.')
(options, args) = parser.parse_args(args)
# print(options)
# print(args)
# Some validation
if not (options.update
^ options.copy
^ options.synchronize
^ options.diff
^ options.compare):
sys.exit("Must provide one of --update, --copy, \
--synchronize, --diff or --compare.");
if (not options.synchronize) and options.dontpurge:
sys.exit("--dont-purge is only valid with --synchronize.")
if (options.compare or options.diff) \
and (options.silent or options.quiet):
sys.exit("--silent and --quiet cannot be combined with neither \
--compre nor --diff!");
# The actions
if options.copy:
self._updatefunc = self._void
self._purgefunc = self._void
elif options.update:
self._copyfunc = self._void
self._purgefunc = self._void
elif options.synchronize:
pass # the default
elif options.diff:
self._printsamefunc = self._void
self._updatefunc = self._void
self._copyfunc = self._void
self._purgefunc = self._void
elif options.compare:
self._updatefunc = self._void
self._copyfunc = self._void
self._purgefunc = self._void
# The options
if options.quiet:
self._verbosity = 1
self._printsamefunc = self._void
if options.silent:
self._printsamefunc = self._void
self._printdifferentfunc = self._void
self._verbosity = 0
if options.shallow:
self._maxdepth = 1
if options.force:
self._force = True
if options.createtarget:
self._maketarget = True
if options.creationtime:
self._comparetimefunc = self._compareTimeStamps
if options.dontpurge:
self._purgefunc = self._void
self._excludes = options.excludes
if self._excludes:
sys.exit("Exclusion of files/directories is not implelemented \
yet! Sorry.");
# The positional arguments: source/target directories
if len(args) < 2:
sys.exit("Argument Error: Source and Target directories \
must be provided!")
elif len(args) > 2:
sys.exit("Too many arguments: " + args)
self._sourcedir = os.path.normcase(os.path.normpath(args[0]))
self._targetdir = os.path.normcase(os.path.normpath(args[1]))
if os.path.realpath(self._sourcedir) == os.path.realpath(self._targetdir):
sys.exit("Source and target directories are the same!");
def _void(self, *arguments, **keywords):
pass
def _print(self, *arguments, **keywords):
try:
print(*arguments, **keywords)
except UnicodeEncodeError as e:
# Some unicode characters do not render on the console
# for some reason...
print(e)
# arguments2 = []
# for argument in arguments:
# arguments2.append( argument.replace("\u2013", "'")
# .replace("\u0174", "(r)")
# .replace("\xae", "(r)") )
# print(*arguments2, **keywords)
def _error(self, *arguments, **keywords):
self._print(file=sys.stderr, *arguments, **keywords)
def _print1(self, *arguments, **keywords):
if self._verbosity > 0: self._print(*arguments, **keywords)
def _print2(self, *arguments, **keywords):
if self._verbosity > 1: self._print(*arguments, **keywords)
def run(self):
self._starttime = time.time()
if self._maketarget and not os.path.exists(self._targetdir):
self._print1('Creating directory "%s"...' % self._targetdir)
os.makedirs(self._targetdir)
carryover = ''
depth = 1
self._runHelper(self._sourcedir, self._targetdir, carryover, depth)
self._endtime = time.time()
def _stat(self, obj):
if isinstance(obj, str):
return os.lstat(obj)
else:
return obj
def _isdir(self, obj):
st = self._stat(obj)
return stat.S_ISDIR(st.st_mode)
def _isfile(self, obj):
st = self._stat(obj)
return stat.S_ISREG(st.st_mode)
def _reportLocation(self, source, name, prefix, depth=1, carryover='', message=''):
self._print(carryover, end='')
line = self._getLocation(source, name, prefix, depth)
if message:
line += ': ' + message
self._print(line)
def _getLocation(self, source, name, prefix, depth=1):
path = os.path.join(source, name)
if self._isdir(path):
line = prefix + ' ' + path + os.sep;
else:
line = prefix*depth + ' ' + os.sep + name;
return line
def _runHelper(self, source, target, carryover='', depth=1):
self._numdirs += 1
dcmp = filecmp.dircmp(source, target)
if dcmp.common:
for x in dcmp.common:
try:
file1 = os.path.join(source, x)
isdir1 = self._isdir(file1)
file2 = os.path.join(target, x)
isdir2 = self._isdir(file2)
if isdir1 != isdir2:
self._printdifferentfunc(source, x, '**', depth, carryover)
carryover = ''
# delete the existing file/directory in target and replace
# it with the one in source
self._purgefunc(file1)
self._copyfunc(x, source, target)
elif isdir1 and isdir2:
line = self._getLocation(source, x, '==', depth)
if depth < self._maxdepth:
self._runHelper(os.path.join(source, x),
os.path.join(target, x),
carryover + line + "\n", depth+1)
else: # two files
if not self._comparetimefunc(file1, file2):
self._printsamefunc(source, x, '==', depth, carryover)
else:
self._printdifferentfunc(source, x, '>>', depth, carryover)
carryover = ''
self._updatefunc(x, source, target)
except Exception as e:
self._error(e)
if dcmp.left_only:
self._print2('Files/Directories in "%s" but not in "%s":' % (source, target))
for x in dcmp.left_only:
try:
self._printdifferentfunc(source, x, '++', depth, carryover)
carryover = ''
self._copyfunc(x, source, target)
except Exception as e:
self._errir(e)
if dcmp.right_only:
self._print2('Files/Directories in "%s" but not in "%s":' % (target, source))
for x in dcmp.right_only:
try:
self._printdifferentfunc(target, x, '--', depth, carryover)
carryover = ''
self._purgefunc(os.path.join(target, x))
except Exception as e:
self._error(e)
def _makeWritable(self, path):
try:
if os.name == 'nt':
# on Windows systems cut to the chase and call the attrib command
result = os.system('attrib -R "' + path + '"')
if result: raise os.error('Failed to make "' + path + '" writable.')
else: # assume Linux
st = self._stat(path)
# find the minimum change required to make it writable
flag = stat.S_IWOTH
if st.st_uid == os.getuid(): flag = stat.S_IWUSR
elif st.st_gid == os.getgid() \
or st.st_gid in os.getgroups(): flag = stat.S_IWGRP
os.chmod(path, st.st_mode | flag)
except Exception as e:
self._error(e)
def _purge(self, path):
self._print1('Deleting "%s"...' % path)
if self._force:
self._makeWritable(path)
if self._isfile(path):
try:
os.remove(path)
self._numdelfiles += 1
except OSError as e:
self._delffld.append(path)
raise
elif self._isdir(path):
try:
shutil.rmtree(path, True)
self._numdeldirs += 1
except shutil.Error as e:
self._deldfld.append(path)
raise
def _copy(self, x, source, target):
file = os.path.join(source, x)
if self._isfile(file):
self._copyFile(x, source, target)
elif self._isdir(file):
self._copyDirectory(x, source, target)
def _copyDirectory(self, dir, source, target):
try:
fulld1 = os.path.join(source, dir)
fulld2 = os.path.join(target, dir)
self._print1('Copying directory tree "%s" into "%s"...' % (fulld1, fulld2))
shutil.copytree(fulld1, fulld2)
self._numnewdirs += 1
self._print2('Done.')
except shutil.Error as e:
self._dirsfld.append(fulld1)
raise
def _copyFile(self, filename, source, target):
""" Copies a file """
if not os.path.exists(target):
# we need to create the destination dir
if self._force:
# make sure its parent dir is writable
self._makeWritable(os.path.dirname(target))
try:
os.makedirs(target)
self._print1('Directory "%s" created.' % target)
except OSError as e:
self._dirsfld.append(target)
raise
if self._force:
# assert the destination is writable
self._makeWritable(target)
sourcefile = os.path.join(source, filename)
try:
self._print1('Copying file "%s" from "%s" to "%s"...' % (filename, source, target))
shutil.copy2(sourcefile, target)
self._numfiles += 1
except (IOError, OSError) as e:
self._copyfld.append(sourcefile)
raise
def _update(self, filename, dir1, dir2):
""" Updates a file based on
last time stamp of modification """
file1 = os.path.join(dir1, filename)
file2 = os.path.join(dir2, filename)
# Update file if file's modification time is older than
# source file's modification time, or creation time. Sometimes
# it so happens that a file's creation time is newer than it's
# modification time! (Seen this on windows)
if self._comparetimefunc(file1, file2) or self._compareSize(file1, file2):
self._print1('Updating file "%s"...' % file2) # source to target
if self._force:
self._makeWritable(file2)
try:
shutil.copy2(file1, file2)
self._numupdates += 1
except (IOError, OSError) as e:
self._updsfld.append(file2)
raise
def _compareTimeStamps(self, x1, x2):
""" Compare time stamps of two files and return True
if file1 (source) is more recent than file2 (target) """
st1 = self._stat(x1)
st2 = self._stat(x2)
return round(st1.st_mtime, 3) > round(st2.st_mtime, 3) or round(st1.st_ctime, 3) > round(st2.st_mtime, 3)
def _compareModTimeStamp(self, x1, x2):
""" Compare time stamps of two files and return True
if file1 (source) is more recent than file2 (target) """
st1 = self._stat(x1)
st2 = self._stat(x2)
return round(st1.st_mtime, 3) > round(st2.st_mtime, 3)
def _compareSize(self, x1, x2):
""" Compare sizes of two files and return True if different """
st1 = self._stat(x1)
st2 = self._stat(x2)
return st1.st_size > st2.st_size
def report(self):
""" Print report of work at the end """
# We need only the first 4 significant digits
tt = (str(self._endtime - self._starttime))[:4]
self._print2('\nPython backupsync.py finished in', tt, 'seconds.')
self._print2(self._numdirs, 'directories parsed,',
self._numfiles, 'files copied.')
self._print2(self._numdelfiles, 'files were purged.')
self._print2(self._numdeldirs, 'directories were purged.')
self._print2(self._numnewdirs, 'directories were created.')
self._print2(self._numupdates, 'files were updated by timestamp.')
# Failure stats
self._print2('\n')
self._print2(len(self._copyfld), 'files could not be copied.')
for f in self._copyfld:
self._print2("\t"+f)
self._print2(len(self._dirsfld), 'directories could not be created.')
for f in self._dirsfld:
self._print2("\t"+f)
self._print2(len(self._updsfld), 'files could not be updated.')
for f in self._updsfld:
self._print2("\t"+f)
self._print2(len(self._deldfld), 'directories could not be purged.')
for f in self._deldfld:
self._print2("\t"+f)
self._print2(len(self._delffld), 'files could not be purged.')
for f in self._delffld:
self._print2("\t"+f)
if __name__ == "__main__":
import sys
copier = BackupSynchronizer()
copier.parse_args(sys.argv[1:])
copier.run()
# print report at the end
copier.report()
