initial work basically

master
Daniel Knüttel 2019-08-14 11:25:54 +02:00
commit bafd5992c5
17 changed files with 704 additions and 0 deletions

0
README.rst 100644
View File

View File

View File

@ -0,0 +1,81 @@
import sys
import logging
import docopt
usage = '''
Usage:
autoimport copy SRC_PATH DST_PATH [options]
autoimport move SRC_PATH DST_PATH [options]
autoimport placeholders
autoimport select SRC_PATH [options]
Options:
-t <specifer> --path-template=<specifer> The template for creating the new directory structure
[default: <DateTime.year>/<DateTime.month>/<DateTime.day>]
-n --no-select-stop-on-error Do not stop selecting files when an error occurs.
-w --walk Walk the directory tree when selecting files.
-p <postfix> --postfix=<postfix> Comma separated list of postfixes for files to look for when
selecting files [default: JPG,NEF].
-d --dry-run Do not write changes.
-v --verbose Generate more output.
-D --debug Turn on debug messages.
-i <dbtype> --implementation=<dbtype> Internal database type (mem|disk) [default: mem]
'''
from .commands import copy, move, placeholders, select
from .tmpdb import get_temporary_db
args = docopt.docopt(usage)
if(args["--verbose"]):
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
if(args["--debug"]):
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
logging.debug("ARGUMENTS:")
for k,v in args.items():
logging.debug("\t{}: \t{}".format(k,v))
try:
db = get_temporary_db(args["--implementation"])
except Exception as e:
print(e)
sys.exit(1)
if(args["placeholders"]):
result = placeholders()
elif(args["copy"]):
result = copy(db
, args["SRC_PATH"]
, args["DST_PATH"]
, args["--path-template"]
, not args["--no-select-stop-on-error"]
, args["--walk"]
, args["--postfix"]
, args["--dry-run"])
elif(args["move"]):
result = move(db
, args["SRC_PATH"]
, args["DST_PATH"]
, args["--path-template"]
, not args["--no-select-stop-on-error"]
, args["--walk"]
, args["--postfix"]
, args["--dry-run"])
elif(args["select"]):
result = select(db
, args["SRC_PATH"]
, not args["--no-select-stop-on-error"]
, args["--walk"]
, args["--postfix"]
, args["--dry-run"])
db.close()
sys.exit(result)

View File

@ -0,0 +1,147 @@
import logging
import traceback
from .select.select import findall
from .order.path_specifier import get_path_specifier, placeholders as ph
from .order.order import order
from .write.paths import create_paths
from .write.files import write_files
def placeholders():
for p in sorted(ph):
print(p)
return 0
def select(db, src_path, stop_on_error, walk, postfix, dryrun):
logger = logging.getLogger(__name__)
extensions = postfix.split(",")
try:
findall(src_path, walk, extensions, db, stop_on_error)
except Exception as e:
logger.error(e)
logger.debug(traceback.format_exc())
return 1
cursor = db.cursor()
result = cursor.execute(
'''SELECT * FROM FILES'''
)
for line in result:
print(line[0])
for k,v in zip(("DateTime"
, "DateTimeDigitized"
, "DateTimeOriginal"
, "Model"
, "Make"
, "Software"),line[1:]):
print("\t", k, ":", v)
cursor.execute(
'''SELECT COUNT(name) FROM FILES'''
)
print("found {} files".format(cursor.fetchone()[0]))
return 0
def copy(db
, src_path
, dst_path
, path_template
, stop_on_error
, walk
, postfix
, dryrun):
return do_copy_or_move(db
, src_path
, dst_path
, path_template
, stop_on_error
, walk
, postfix
, dryrun
, False)
def move(db
, src_path
, dst_path
, path_template
, stop_on_error
, walk
, postfix
, dryrun):
return do_copy_or_move(db
, src_path
, dst_path
, path_template
, stop_on_error
, walk
, postfix
, dryrun
, True)
def do_copy_or_move(db
, src_path
, dst_path
, path_template
, stop_on_error
, walk
, postfix
, dryrun
, move):
logger = logging.getLogger(__name__)
extensions = postfix.split(",")
try:
findall(src_path, walk, extensions, db, stop_on_error)
except Exception as e:
logger.error(e)
logger.debug(traceback.format_exc())
return 1
cursor = db.cursor()
cursor.execute(
'''SELECT COUNT(name) FROM FILES'''
)
print("found {} files".format(cursor.fetchone()[0]))
try:
path_specifier = get_path_specifier(path_template)
except Exception as e:
logger.error(str(e))
logger.debug(traceback.format_exc())
return 2
order(db, path_specifier)
cursor.execute(
'''SELECT COUNT(rowid) FROM ASSOCIATIONS'''
)
print("created {} associations between files and directories".format(cursor.fetchone()[0]))
cursor.execute(
'''SELECT COUNT(name) FROM DIRECTORIES'''
)
print("will create {} new directories".format(cursor.fetchone()[0]))
for line in db._db.iterdump():
logging.debug(line)
try:
create_paths(db, dst_path, dryrun)
except Exception as e:
logger.error(str(e))
logger.debug(traceback.format_exc())
return 3
try:
write_files(db, dst_path, src_path, move, dryrun)
except Exception as e:
logger.error(str(e))
logger.debug(traceback.format_exc())
return 3
print("done")

View File

View File

@ -0,0 +1,5 @@
import datetime
def get_datetime(time_str):
time_format = "%Y:%m:%d %H:%M:%S"
return datetime.datetime.strptime(time_str, time_format)

View File

@ -0,0 +1,80 @@
import os
import logging
from .date_and_time import get_datetime
def order(db, path_specifier):
logger = logging.getLogger(__name__)
cursor = db.cursor()
result = cursor.execute(
'''SELECT rowid,
name,
DateTime,
DateTimeDigitized,
DateTimeOriginal,
Model,
Make,
Software
FROM FILES'''
)
for (rowid
, name
, DateTime
, DateTimeDigitized
, DateTimeOriginal
, Model
, Make
, Software) in cursor.fetchall():
DateTime = get_datetime(DateTime)
DateTimeDigitized = get_datetime(DateTimeDigitized)
DateTimeOriginal = get_datetime(DateTimeOriginal)
data = {
"<name>": name,
"<DateTime.day>": str(DateTime.day).zfill(2),
"<DateTime.month>": str(DateTime.month).zfill(2),
"<DateTime.year>": DateTime.year,
"<DateTime.hour>": str(DateTime.hour).zfill(2),
"<DateTime.minute>": str(DateTime.minute).zfill(2),
"<DateTime.second>": str(DateTime.second).zfill(2),
"<DateTimeDigitized.day>": str(DateTimeDigitized.day).zfill(2),
"<DateTimeDigitized.month>": str(DateTimeDigitized.month).zfill(2),
"<DateTimeDigitized.year>": DateTimeDigitized.year,
"<DateTimeDigitized.hour>": str(DateTimeDigitized.hour).zfill(2),
"<DateTimeDigitized.minute>": str(DateTimeDigitized.minute).zfill(2),
"<DateTimeDigitized.second>": str(DateTimeDigitized.second).zfill(2),
"<DateTimeOriginal.day>": str(DateTimeOriginal.day).zfill(2),
"<DateTimeOriginal.month>": str(DateTimeOriginal.month).zfill(2),
"<DateTimeOriginal.year>": DateTimeOriginal.year,
"<DateTimeOriginal.hour>": str(DateTimeOriginal.hour).zfill(2),
"<DateTimeOriginal.minute>": str(DateTimeOriginal.minute).zfill(2),
"<DateTimeOriginal.second>": str(DateTimeOriginal.second).zfill(2),
"<Model>": Model,
"<Make>": Make,
"<Software>": Software
}
this_path = [str(data[p]) if p in data else p for p in path_specifier]
logger.debug(this_path)
this_path = os.path.join(*this_path)
path_id = get_path_id(db, this_path)
cursor.execute("INSERT INTO ASSOCIATIONS(file_id, directory_id) VALUES(?, ?)", (rowid, path_id))
def get_path_id(db, path):
cursor = db.cursor()
cursor.execute("SELECT rowid FROM DIRECTORIES WHERE name=?", (path,))
result = cursor.fetchone()
if(result):
return result[0]
cursor.execute("INSERT INTO DIRECTORIES(name) VALUES(?)", (path,))
return cursor.lastrowid

View File

@ -0,0 +1,62 @@
"""
This module brings a way to build the path specifiers internally used
(lists of strings) from the input path specifier (a string).
The input string will contain literal path specifiers and placeholders.
The placeholders are marked by chevrons: ``<placeholder>``
An example path specifier might look like this::
<DateTime-year>/<DateTime-month>/<DateTime-day>/images/<Make>/<Model>
The resulting internal specifier will be::
[
"<DateTime-year>"
, "<DateTime-month>"
, "<DateTime-day>"
, "images"
, "<Make>"
, "<Model>"
]
It will also check whether the placeholders are actually valid.
"""
import os
placeholders = {
"<name>",
"<DateTime.day>",
"<DateTime.month>",
"<DateTime.year>",
"<DateTime.hour>",
"<DateTime.minute>",
"<DateTime.second>",
"<DateTimeDigitized.day>",
"<DateTimeDigitized.month>",
"<DateTimeDigitized.year>",
"<DateTimeDigitized.hour>",
"<DateTimeDigitized.minute>",
"<DateTimeDigitized.second>",
"<DateTimeOriginal.day>",
"<DateTimeOriginal.month>",
"<DateTimeOriginal.year>",
"<DateTimeOriginal.hour>",
"<DateTimeOriginal.minute>",
"<DateTimeOriginal.second>",
"<Model>",
"<Make>",
"<Software>"
}
def get_path_specifier(string_path_specifer):
data = string_path_specifer.split(os.path.sep)
for d in data:
if((d.startswith("<")
and d.endswith(">"))
and d not in placeholders):
raise ValueError("unknown placeholder: {}".format(d))
return data

View File

View File

@ -0,0 +1,66 @@
import logging
import json
from PIL import Image, ExifTags
import exifread
def extract_metadata_from_file(filename):
logger = logging.getLogger(__name__)
logger.info("handling: {}".format(filename))
try:
img = Image.open(filename)
except Exception as e:
logger.error("failed to open and load '{}'".format(filename))
img.close()
raise e
if(hasattr(img, "_getexif")):
try:
exif = {ExifTags.TAGS[k]: v for k, v in img._getexif().items() if k in ExifTags.TAGS}
except Exception as e:
logger.error("failed to read EXIF data from '{}'".format(filename))
raise e
finally:
img.close()
else:
img.close()
# We cannot use PIL because PIL is crap. So we use
# exifread. This is a little slower but will produce
# results more safely.
exif = get_exif_with_exifread(filename)
values_no_preprocessing = {"DateTime"
, "DateTimeDigitized"
, "DateTimeOriginal"
, "Model"
, "Make"
, "Software"}
for k in values_no_preprocessing:
if(not k in exif):
logger.error("missing EXIF value {} in '{}'".format(
k, filename))
raise KeyError("missing EXIF value {}".format(k))
result = {k: exif[k] for k in values_no_preprocessing}
return result
def get_exif_with_exifread(filename):
with open(filename, "rb") as image:
tags = exifread.process_file(image)
exif_tag_header = "EXIF "
exif_tag_header_length = len(exif_tag_header)
data = {k[exif_tag_header_length:]: v.values for k,v in tags.items()
if k.startswith(exif_tag_header)}
# Not all the tags we want are in the EXIF section.
data_from_image_section = {"DateTime", "Make", "Software", "Model"}
image_tag_header = "Image "
data.update({key: tags[real_key].values for key, real_key in
((i, image_tag_header + i) for i in data_from_image_section)})
return data

View File

@ -0,0 +1,54 @@
import os
import logging
module_logger = logging.getLogger(__name__)
from .metadata import extract_metadata_from_file
def findall_this_directory(directory, files, extensions, db, stop_on_error):
for filename in files:
module_logger.debug("handling file: {}".format(filename))
if(filename.split(".")[-1] in extensions):
filename = os.path.join(directory, filename)
insert_file_into_db(filename, db, stop_on_error)
def insert_file_into_db(filename, db, stop_on_error):
try:
metadata = extract_metadata_from_file(filename)
except Exception as e:
if(stop_on_error):
module_logger.error(
"an error occured, the program execution ends now, set ``--no-select-stop-on-error`` to continue anyways")
module_logger.error("file was: {}".format(filename))
raise e
module_logger.error("ignoring error")
return
data = [filename]
metadata_keys = ["DateTime"
, "DateTimeDigitized"
, "DateTimeOriginal"
, "Model"
, "Make"
, "Software"]
data.extend([metadata[k] for k in metadata_keys])
cursor = db.cursor()
cursor.execute('''INSERT INTO FILES(name,
DateTime,
DateTimeDigitized,
DateTimeOriginal,
Model,
Make,
Software)
VALUES(?, ?, ?, ?, ?, ?, ?)'''
, data)
def findall(directory, walk, extensions, db, stop_on_error):
for dir_, paths, files in os.walk(directory):
findall_this_directory(dir_, files, extensions, db, stop_on_error)
if(not walk):
break

112
autoimport/tmpdb.py 100644
View File

@ -0,0 +1,112 @@
"""
This module provides a way to construct the temporary database
used by ``autoimport``.
The database is used to transfer the data between the ``autoimport``
modules: ``select``, ``order`` and ``write``.
``autoimport`` always uses a sqlite3 database as an interface but
the database can be stored in memory (fast) or on the disk
(for huge amounts of images).
"""
import sqlite3
import tempfile
import abc
def _open_db_mem():
return (sqlite3.connect(":memory:"), None)
def _open_db_disk():
file = tempfile.NamedTemporaryFile()
db = sqlite3.connect(file.name)
return (db, file)
class AbstractTemporaryDatabase(abc.ABC):
"""
Abstract base class for all ``TemporaryDatabase``
implementations.
**Note**: ``__init__`` must set ``self._db`` to an
open sqlite3 connection.
"""
def __init__(self):
abc.ABC.__init__(self)
self._db = None
@abc.abstractmethod
def close(self):
pass
def cursor(self):
return self._db.cursor()
def dump_db(self, file):
for line in self._db.iterdump():
file.write("{}\n".format(line))
class MemoryTemporaryDatabase(AbstractTemporaryDatabase):
def __init__(self):
AbstractTemporaryDatabase.__init__(self)
self._db,_ = _open_db_mem()
def close(self):
self._db.close()
class DiskTemporaryDatabase(AbstractTemporaryDatabase):
def __init__(self):
AbstractTemporaryDatabase.__init__(self)
db, file = _open_db_disk()
self._db = db
self._file = file
def close(self):
self._db.close()
self._file.close()
def get_temporary_db(type_):
"""
Return an open ``TemporaryDatabase`` with already set up tables.
``type_`` is either ``"mem"`` for the in-memory implementation or
``"disk"`` for the on-disk implementation.
"""
implementations = {"mem": MemoryTemporaryDatabase,
"disk": DiskTemporaryDatabase}
if(not type_ in implementations):
raise ValueError("unsuppored implementation: {}".format(type_))
impl = implementations[type_]
instance = impl()
cursor = instance.cursor()
cursor.execute(
'''CREATE TABLE FILES(
name TEXT,
DateTime TEXT,
DateTimeDigitized TEXT,
DateTimeOriginal TEXT,
Model TEXT,
Make TEXT,
Software TEXT)'''
)
cursor.execute(
'''CREATE TABLE DIRECTORIES(
name TEXT)'''
)
cursor.execute(
'''CREATE TABLE ASSOCIATIONS(file_id INTEGER,
directory_id INTEGER)'''
)
cursor.execute(
'''CREATE TABLE KV(key TEXT,
value TEXT)'''
)
return instance

View File

View File

@ -0,0 +1,47 @@
"""
This modules provides functions to copy/move the files.
"""
import os
import logging
import shutil
module_logger = logging.getLogger(__name__)
def write_files(db
, output_basepath
, input_basepath
, move
, dry_run):
"""
Write the changes in the file structure on the disk.
"""
cursor = db.cursor()
result = cursor.execute(
'''SELECT DIRECTORIES.name AS pathname,
FILES.name AS filename
FROM FILES JOIN ASSOCIATIONS ON FILES.rowid=ASSOCIATIONS.file_id
JOIN DIRECTORIES ON DIRECTORIES.rowid=ASSOCIATIONS.directory_id
'''
)
for pathname, filename in result:
src_name = filename
dst_name = os.path.join(output_basepath, pathname, os.path.basename(filename))
if(dry_run):
module_logger.warning("COPY {} -> {}".format(src_name, dst_name))
if(move):
module_logger.warning("RM {}".format(src_name))
continue
module_logger.info("COPY {} -> {}".format(src_name, dst_name))
shutil.copyfile(src_name, dst_name)
shutil.copystat(src_name, dst_name)
if(move):
module_logger.info("RM {}".format(src_name))
os.remove(src_name)

View File

@ -0,0 +1,32 @@
"""
This module creates the required paths for or moving the files.
"""
import os
import logging
module_logger = logging.getLogger(__name__)
def create_paths(db, base_path, dry_run):
cursor = db.cursor()
result = cursor.execute(
'''SELECT name FROM DIRECTORIES'''
)
for (pathname,) in result:
real_path_name = os.path.join(base_path, pathname)
if(dry_run):
if(os.path.exists(real_path_name)):
module_logger.info("EXISTS: {}".format(real_path_name))
else:
module_logger.warn("CREATE {}".format(real_path_name))
continue
try:
if(os.path.exists(real_path_name)):
module_logger.info("EXISTS: {}".format(real_path_name))
else:
module_logger.info("CREATE {}".format(real_path_name))
os.makedirs(real_path_name)
except Exception as e:
module_logger.error("failed to create directory {}".format(real_path_name))
raise e

3
requirements.txt 100644
View File

@ -0,0 +1,3 @@
Pillow
exifread
docopt

15
setup.py 100644
View File

@ -0,0 +1,15 @@
from setuptools import setup, find_packages
setup(
name = "autoimport",
version = "0.0.1",
packages = find_packages(),
author = "Daniel Knüttel",
author_email = "daniel.knuettel@daknuett.eu",
install_requires = ["docopt"],
description = "A script to find, order and copy images",
long_description = open("README.rst").read(),
entry_points = {"console_scripts": ["autoimport = licor"]}
)