from __future__ import print_function
__author__ = 'Viktor Kerkez <alefnula@gmail.com>'
__date__ = '11 January 2013'
__copyright__ = 'Copyright (c) 2013 Viktor Kerkez'
import os
import six
import random
from tea.system import platform
__all__ = ['CryptError', 'encrypt', 'decrypt']
class CryptError(Exception):
pass
# key to use for encryption if no other alternative is possible
_key = [213, 21, 199, 11, 17, 227, 18, 7, 114, 184, 27,
162, 37, 112, 222, 209, 241, 24, 175, 144, 173,
53, 196, 29, 124, 26, 17, 218, 131, 236, 53, 209]
# initialization vector for algorithms that requires is
_vector = [16, 63, 1, 111, 23, 3, 113, 119, 21,
121, 31, 112, 44, 32, 114, 255]
# list of supported encryption algorithms in priority order
algorithms = ('aes', 'dotnet', 'win', 'simple')
# Map of implemented encryption algorithms
# Encryption and decryption holds dict of algorithm name to function that
# implements algorithm.
# Every function that implements encryption or decryption must receive data to
# process and a key to process it with.
# encryptior and decryption decorators can be used to register implementations.
implementations = {
'encryption': {},
'decryption': {},
'get_key': lambda: _key
}
# Decorators
def encrypter(name):
"""Decorator for registering function as encryptor."""
def wrapper(func):
implementations['encryption'][name] = func
return func
return wrapper
def decrypter(name):
"""Decorator for registering function as decryptor."""
def wrapper(func):
implementations['decryption'][name] = func
return func
return wrapper
def keygetter(func):
"""Decorator for registering function for getting the encryption key."""
implementations['get_key'] = func
return func
# Helpers
def _generate_key(length=32):
"""Generates list of random number in range 0 to 255 inclusive of specified
length."""
return [random.randint(0, 255) for _ in range(length)]
def _to_hex_digest(bts):
"""Converts sequence of bytes to hex digest."""
return ''.join(['%02x' % x for x in map(ord, bts)])
def _from_hex_digest(digest):
"""Converts hex digest to sequence of bytes."""
return ''.join([chr(int(digest[x:x + 2], 16))
for x in range(0, len(digest), 2)])
# Try to register AES from pycrypt
try:
from Crypto.Cipher import AES
def _get_cipher(key):
"""Create AES chiper with provided key."""
key = ''.join(map(chr, key))
vector = ''.join(map(chr, _vector))
return AES.new(key, AES.MODE_CFB, vector)
@encrypter('aes')
def _aes_encrypt(data, key):
"""Encrypts data using provided key with AES cipher."""
return _get_cipher(key).encrypt(data)
@decrypter('aes')
def _aes_decrypt(data, key):
"""Decrypts data using provided key with AES cipher."""
return _get_cipher(key).decrypt(data)
except ImportError:
pass
if platform.is_only(platform.WINDOWS):
# if we have win32 installed add native windows crypting
try:
import ctypes
from ctypes import wintypes
from ctypes import windll
class DATA_BLOB(ctypes.Structure):
_fields_ = [
('cbData', wintypes.DWORD),
('pbData', ctypes.POINTER(ctypes.c_char))
]
def get_data(data_blob):
cbData = int(data_blob.cbData)
pbData = data_blob.pbData
cbuffer = ctypes.c_buffer(cbData)
ctypes.cdll.msvcrt.memcpy(cbuffer, pbData, cbData)
windll.kernel32.LocalFree(pbData) # @UndefinedVariable
return cbuffer.raw
CRYPTPROTECT_UI_FORBIDDEN = 0x01
@encrypter('win')
def Win32CryptProtectData(data, key=None):
if isinstance(data, six.text_type):
data = data.encode('utf-8')
buffer_in = ctypes.c_buffer(data, len(data))
blob_in = DATA_BLOB(len(data), buffer_in)
blob_out = DATA_BLOB()
CryptProtectData = windll.crypt32.CryptProtectData
if CryptProtectData(ctypes.byref(blob_in), u'tea-crypt',
None, None, None,
CRYPTPROTECT_UI_FORBIDDEN,
ctypes.byref(blob_out)):
return get_data(blob_out)
else:
return ''
@decrypter('win')
def Win32CryptUnprotectData(data, key=None):
if isinstance(data, six.text_type):
data = data.encode('utf-8')
buffer_in = ctypes.c_buffer(data, len(data))
blob_in = DATA_BLOB(len(data), buffer_in)
blob_out = DATA_BLOB()
CryptUnprotectData = windll.crypt32.CryptUnprotectData
if CryptUnprotectData(ctypes.byref(blob_in),
None, None, None, None,
CRYPTPROTECT_UI_FORBIDDEN,
ctypes.byref(blob_out)):
return get_data(blob_out)
else:
return ''
# import win32crypt
# import win32cryptcon
# @encrypter('win')
# def _encrypt(data, key=None):
# """Encrypts data using windows crypt service.
# Key is not used.
# """
# windll.c
# return win32crypt.CryptProtectData(
# data, 'tea-crypt', None, None, None,
# win32cryptcon.CRYPTPROTECT_UI_FORBIDDEN
# )
#
# @decrypter('win')
# def _decrypt(data, key=None):
# """Decrypts data using windows crypt service.
# Key is not used.
# """
# return win32crypt.CryptUnprotectData(
# data, None, None, None,
# win32cryptcon.CRYPTPROTECT_UI_FORBIDDEN
# )[1]
except ImportError:
pass
# if we have win32 installed try add key storage using windows credentials
try:
import win32cred
import pywintypes
def _vault_store(service, ident, secret):
target = '%(ident)s@%(service)s' % vars()
credential = dict(Type=win32cred.CRED_TYPE_GENERIC,
TargetName=target,
UserName=ident,
CredentialBlob=secret,
Comment='Stored using tea-crypt',
Persist=win32cred.CRED_PERSIST_ENTERPRISE)
win32cred.CredWrite(credential, 0)
def _vault_retrieve(service, ident):
target = '%(ident)s@%(service)s' % vars()
try:
res = win32cred.CredRead(Type=win32cred.CRED_TYPE_GENERIC,
TargetName=target)
except pywintypes.error as e:
if e.winerror == 1168 and e.funcname == 'CredRead':
return None
raise
return res['CredentialBlob'].decode('utf-16')
def _vault_delete(service, ident):
target = '%(ident)s@%(service)s' % vars()
win32cred.CredDelete(Type=win32cred.CRED_TYPE_GENERIC,
TargetName=target)
# TODO: There is a bug with saving and getting key from vault
# probably something with encoding. Sometimes _vault_retrieve
# returns numbers higher then 255 (even if they were not stored)
@keygetter
def get_key():
key = _vault_retrieve('tea', 'tea-crypt')
if key:
return [ord(k) % 256 for k in key]
else:
key = _generate_key()
_vault_store('tea', 'tea-crypt', ''.join(map(chr, key)))
return key
except ImportError:
pass
elif platform.is_a(platform.DOTNET):
# ironpython - .net implementation of AES can be used
# ironpython can be used on mono so check what options are for key storage
import clr
clr.AddReference('System.Security')
from System import Array, Byte
from System.Text import UTF8Encoding
from System.IO import MemoryStream
from System.Security.Cryptography import (RijndaelManaged, CryptoStream,
CryptoStreamMode, ProtectedData,
DataProtectionScope)
@encrypter('dotnet')
def _dotnet_encrypt(text, key):
"""Performs crypting of provided text using AES algorithm.
If 'digest' is True hex_digest will be returned, otherwise bytes of
encrypted data will be returned.
This function is symmetrical with decrypt function.
"""
utfEncoder = UTF8Encoding()
bytes_text = utfEncoder.GetBytes(text)
rm = RijndaelManaged()
key = Array[Byte](key)
enc_transform = rm.CreateEncryptor(key, Array[Byte](_vector))
mem = MemoryStream()
cs = CryptoStream(mem, enc_transform, CryptoStreamMode.Write)
cs.Write(bytes_text, 0, len(bytes_text))
cs.FlushFinalBlock()
mem.Position = 0
encrypted = Array.CreateInstance(Byte, mem.Length)
mem.Read(encrypted, 0, mem.Length)
cs.Close()
return ''.join(map(chr, encrypted))
@decrypter('dotnet')
def _dotnet_decrypt(data, key):
"""Performs decrypting of provided encrypted data.
If 'digest' is True data must be hex digest, otherwise data should be
encrypted bytes.
This function is symmetrical with encrypt function.
"""
data = Array[Byte](map(Byte, map(ord, data)))
key = Array[Byte](map(Byte, key))
rm = RijndaelManaged()
dec_transform = rm.CreateDecryptor(key, Array[Byte](_vector))
mem = MemoryStream()
cs = CryptoStream(mem, dec_transform, CryptoStreamMode.Write)
cs.Write(data, 0, data.Length)
cs.FlushFinalBlock()
mem.Position = 0
decrypted = Array.CreateInstance(Byte, mem.Length)
mem.Read(decrypted, 0, decrypted.Length)
cs.Close()
utfEncoder = UTF8Encoding()
return utfEncoder.GetString(decrypted)
# Can't find a way to invoke win32api from ironpython without wrapping it
# to C# dll and using pInvoke. So, instead of using windows credentials to
# store a key, key will be stored in file, encrypted with
# ProtedtedData.Protect.
@keygetter
def get_key():
from tea.system import get_appdata
from tea.shell import mkdir
dir_path = os.path.join(get_appdata(), 'Tea')
key_path = os.path.join(dir_path, 'key.bin')
if os.path.exists(dir_path) and os.path.exists(key_path):
with open(key_path, 'rb') as f:
cr_key = Array[Byte](map(ord, f.read()))
key = ProtectedData.Unprotect(cr_key, None,
DataProtectionScope.CurrentUser)
return [int(k, 10) for k in key]
else:
mkdir(dir_path)
key = _generate_key()
arr_key = Array[Byte](key)
cr_key = ProtectedData.Protect(arr_key, None,
DataProtectionScope.CurrentUser)
with open(key_path, 'wb') as f:
f.write(cr_key)
return key
elif platform.is_a(platform.POSIX):
# some kind of posix OS - more detail check is needed
pass
elif platform.is_a(platform.DARWIN):
# macOS - dunno what to do here
pass
else:
# this is default cipher - not very secure, but at least password will not
# be in plain text format.
@encrypter('simple')
def _simple_encrypt(data, key):
key_len = len(key)
encrypted = []
for i, c in enumerate(data):
key_c = key[i % key_len]
msg_c = ord(c)
encrypted.append(chr((msg_c + key_c) % 127))
return ''.join(encrypted)
@decrypter('simple')
def _simple_decrypt(data, key):
key_len = len(key)
decrypted = []
for i, c in enumerate(data):
key_c = key[i % key_len]
enc_c = ord(c)
decrypted.append(chr((enc_c - key_c) % 127))
return ''.join(decrypted)
def get_best_algorithm():
for alg in algorithms:
if (alg in implementations['encryption'] and
alg in implementations['decryption']):
return alg
raise CryptError('No crypting algorithms suitable for %s platform' %
platform)
# public interface
[docs]def encrypt(data, digest=True):
"""Performs encryption of provided data."""
alg = get_best_algorithm()
enc = implementations['encryption'][alg](data,
implementations['get_key']())
return '%s$%s' % (alg, (_to_hex_digest(enc) if digest else enc))
[docs]def decrypt(data, digest=True):
"""Decrypts provided data."""
alg, _, data = data.rpartition('$')
if not alg:
return data
data = _from_hex_digest(data) if digest else data
try:
return implementations['decryption'][alg](data,
implementations['get_key']())
except KeyError:
raise CryptError('Can not decrypt key for algorithm: %s' % alg)
def main():
print(implementations)
print(implementations['get_key'].__doc__)
print(implementations['get_key']())
print(encrypt('password for encryption'))
print(decrypt(encrypt('password for encryption')))
if __name__ == '__main__':
main()