added bunker.bunker.Bunker

This commit is contained in:
Daniel Knüttel 2019-03-13 14:02:48 +01:00
parent 327a585eac
commit a92684735e
3 changed files with 157 additions and 2 deletions

120
bunker/bunker.py Normal file
View File

@ -0,0 +1,120 @@
import uuid
import ljson
from Crypto.Cipher import AES
from .files.tarfile import RewriteableTarFile
from .files.bunkeredfile import BunkeredFile
class Bunker(object):
def __init__(self, path):
self._tarfile = RewriteableTarFile.open(path)
self._components = ljson.Table.from_file(self._tarfile.get_file("__bunker_main__"))
@classmethod
def open(cls, path):
tarfile = RewriteableTarFile.open(path)
if(not tarfile.has_file("__bunker_main__")):
main_file = BunkeredFile.empty("__bunker_main__")
header = ljson.Header({"component": {"type": "str", "modifiers": []}
, "file_name": {"type": "str", "modifiers": []}
, "type": {"type": "str", "modifiers": []}
, "key": {"type": "bytes", "modifiers": []}
, "initvect": {"type": "bytes", "modifiers": []}
, "length": {"type": "int", "modifiers": []}})
table = ljson.Table(header, [])
table.save(main_file)
tarfile.add_file(main_file)
tarfile.close()
return cls(path)
def _load_component(self, name, password):
chunk_size = 16
if(not {"component": name} in self._components):
raise ValueError("unknown component: {}".format(name))
info = self._components[{"component": name}]
file_name = info["file_name"][0]
length = info["length"][0]
password = AES.new(password, AES.MODE_ECB)
key = password.decrypt(info["key"][0])
initvect = password.decrypt(info["initvect"][0])
key = AES.new(key, AES.MODE_CBC, initvect)
file_ = self._tarfile.get_file(file_name)
decrypted = BunkeredFile.empty(file_name, length_hint=len(file_))
length_read = 0
while(True):
chunk = file_.read(chunk_size)
length_read += len(chunk)
if(length_read == length):
decrypted.write(key.decrypt(chunk))
break
if(length_read > length):
decrypted.write(key.decrypt(chunk)[:length - length_read])
break
decrypted.write(key.decrypt(chunk))
decrypted.seek(0, 0)
return decrypted, info["type"][0]
def _add_component(self, name, password, type_):
if({"name": name} in self._components):
raise ValueError("component {} is already in the bunker".format(name))
password = AES.new(password, AES.MODE_ECB)
filename = uuid.uuid1().hex
key = uuid.uuid4().bytes + uuid.uuid4().bytes
initvect = uuid.uuid4().bytes
key = password.encrypt(key)
initvect = password.encrypt(initvect)
self._components.additem({"component": name
, "file_name": filename
, "key": key
, "initvect": initvect
, "type": type_
, "length": 0})
self._tarfile.add_file(BunkeredFile.empty(filename))
self._components.save(self._tarfile.get_file("__bunker_main__"))
self._tarfile.writeback_file("__bunker_main__")
def _save_compontent(self, name, password, file_):
chunk_size = 16
if(not {"component": name} in self._components):
raise ValueError("unknown component: {}".format(name))
info = self._components[{"component": name}]
file_name = info["file_name"][0]
password = AES.new(password, AES.MODE_ECB)
key = password.decrypt(info["key"][0])
initvect = password.decrypt(info["initvect"][0])
key = AES.new(key, AES.MODE_CBC, initvect)
file_out = self._tarfile.get_file(file_name)
file_out.truncate(0)
file_.seek(0, 0)
chunk = file_.read(chunk_size)
length = 0
while(chunk):
length += len(chunk)
if(len(chunk) < 16):
chunk = chunk + b"\x00"*(16 - len(chunk))
file_out.write(key.encrypt(chunk))
chunk = file_.read(chunk_size)
self._components[{"component": name}]["length"] = length
file_out.close()

View File

@ -21,12 +21,12 @@ from setuptools import setup, find_packages
setup(
name = "bunker",
version = "0.0.0",
version = "0.0.1",
packages = find_packages(),
author = "Daniel Knüttel",
author_email = "daniel.knuettel@daknuett.eu",
url = "https://daknuett.eu/gitea/daknuett/bunker",
#install_requires = ["docopt"],
install_requires = ["ljson>=0.5.2", "pycrypto"],
description = "A module for encrypted data storage",
long_description = open("README.rst").read(),

View File

@ -0,0 +1,35 @@
import os
import pytest
from bunker.bunker import Bunker
from bunker.files.bunkeredfile import BunkeredFile
@pytest.fixture
def bunker_with_test_file_empty(tmpdir):
path = os.path.join(str(tmpdir), "test.bunker")
bunker = Bunker.open(path)
bunker._add_component("test", b"H6ihKLXV8HMQWbJs", "kvs")
return bunker, "test", b"H6ihKLXV8HMQWbJs"
def test_add_component(tmpdir):
path = os.path.join(str(tmpdir), "test.bunker")
bunker = Bunker.open(path)
bunker._add_component("test", b"H6ihKLXV8HMQWbJs", "kvs")
assert {"component": "test"} in bunker._components
def test_load_component1(bunker_with_test_file_empty):
bunker, component_name, password = bunker_with_test_file_empty
component, type_ = bunker._load_component(component_name, password)
assert isinstance(component, BunkeredFile)
def test_save_and_load(bunker_with_test_file_empty):
bunker, component_name, password = bunker_with_test_file_empty
component, type_ = bunker._load_component(component_name, password)
component.write(b"this is a test text")
bunker._save_compontent(component_name, password, component)
component, type_ = bunker._load_component(component_name, password)
assert component.read() == b"this is a test text"