Source code for tea.utils.crypto

from __future__ import print_function

__author__ = 'Viktor Kerkez <alefnula@gmail.com>'
__date__ = '11 January 2013'
__copyright__ = 'Copyright (c) 2013 Viktor Kerkez'

import os
import six
import random
from tea.system import platform

__all__ = ['CryptError', 'encrypt', 'decrypt']


[docs]class CryptError(Exception): pass
# key to use for encryption if no other alternative is possible _key = [213, 21, 199, 11, 17, 227, 18, 7, 114, 184, 27, 162, 37, 112, 222, 209, 241, 24, 175, 144, 173, 53, 196, 29, 124, 26, 17, 218, 131, 236, 53, 209] # initialization vector for algorithms that requires is _vector = [16, 63, 1, 111, 23, 3, 113, 119, 21, 121, 31, 112, 44, 32, 114, 255] # list of supported encryption algorithms in priority order algorithms = ('aes', 'dotnet', 'win', 'simple') # Map of implemented encryption algorithms # Encryption and decryption holds dict of algorithm name to function that # implements algorithm. # Every function that implements encryption or decryption must receive data to # process and a key to process it with. # encryptior and decryption decorators can be used to register implementations. implementations = { 'encryption': {}, 'decryption': {}, 'get_key': lambda: _key } # Decorators def encrypter(name): """Decorator for registering function as encryptor.""" def wrapper(func): implementations['encryption'][name] = func return func return wrapper def decrypter(name): """Decorator for registering function as decryptor.""" def wrapper(func): implementations['decryption'][name] = func return func return wrapper def keygetter(func): """Decorator for registering function for getting the encryption key.""" implementations['get_key'] = func return func # Helpers def _generate_key(length=32): """Generates list of random number in range 0 to 255 inclusive of specified length.""" return [random.randint(0, 255) for _ in range(length)] def _to_hex_digest(bts): """Converts sequence of bytes to hex digest.""" return ''.join(['%02x' % x for x in map(ord, bts)]) def _from_hex_digest(digest): """Converts hex digest to sequence of bytes.""" return ''.join([chr(int(digest[x:x + 2], 16)) for x in range(0, len(digest), 2)]) # Try to register AES from pycrypt try: from Crypto.Cipher import AES def _get_cipher(key): """Create AES chiper with provided key.""" key = ''.join(map(chr, key)) vector = ''.join(map(chr, _vector)) return AES.new(key, AES.MODE_CFB, vector) @encrypter('aes') def _aes_encrypt(data, key): """Encrypts data using provided key with AES cipher.""" return _get_cipher(key).encrypt(data) @decrypter('aes') def _aes_decrypt(data, key): """Decrypts data using provided key with AES cipher.""" return _get_cipher(key).decrypt(data) except ImportError: pass if platform.is_only(platform.WINDOWS): # if we have win32 installed add native windows crypting try: import ctypes from ctypes import wintypes from ctypes import windll class DATA_BLOB(ctypes.Structure): _fields_ = [ ('cbData', wintypes.DWORD), ('pbData', ctypes.POINTER(ctypes.c_char)) ] def get_data(data_blob): cbData = int(data_blob.cbData) pbData = data_blob.pbData cbuffer = ctypes.c_buffer(cbData) ctypes.cdll.msvcrt.memcpy(cbuffer, pbData, cbData) windll.kernel32.LocalFree(pbData) # @UndefinedVariable return cbuffer.raw CRYPTPROTECT_UI_FORBIDDEN = 0x01 @encrypter('win') def Win32CryptProtectData(data, key=None): if isinstance(data, six.text_type): data = data.encode('utf-8') buffer_in = ctypes.c_buffer(data, len(data)) blob_in = DATA_BLOB(len(data), buffer_in) blob_out = DATA_BLOB() CryptProtectData = windll.crypt32.CryptProtectData if CryptProtectData(ctypes.byref(blob_in), u'tea-crypt', None, None, None, CRYPTPROTECT_UI_FORBIDDEN, ctypes.byref(blob_out)): return get_data(blob_out) else: return '' @decrypter('win') def Win32CryptUnprotectData(data, key=None): if isinstance(data, six.text_type): data = data.encode('utf-8') buffer_in = ctypes.c_buffer(data, len(data)) blob_in = DATA_BLOB(len(data), buffer_in) blob_out = DATA_BLOB() CryptUnprotectData = windll.crypt32.CryptUnprotectData if CryptUnprotectData(ctypes.byref(blob_in), None, None, None, None, CRYPTPROTECT_UI_FORBIDDEN, ctypes.byref(blob_out)): return get_data(blob_out) else: return '' # import win32crypt # import win32cryptcon # @encrypter('win') # def _encrypt(data, key=None): # """Encrypts data using windows crypt service. # Key is not used. # """ # windll.c # return win32crypt.CryptProtectData( # data, 'tea-crypt', None, None, None, # win32cryptcon.CRYPTPROTECT_UI_FORBIDDEN # ) # # @decrypter('win') # def _decrypt(data, key=None): # """Decrypts data using windows crypt service. # Key is not used. # """ # return win32crypt.CryptUnprotectData( # data, None, None, None, # win32cryptcon.CRYPTPROTECT_UI_FORBIDDEN # )[1] except ImportError: pass # if we have win32 installed try add key storage using windows credentials try: import win32cred import pywintypes def _vault_store(service, ident, secret): target = '%(ident)s@%(service)s' % vars() credential = dict(Type=win32cred.CRED_TYPE_GENERIC, TargetName=target, UserName=ident, CredentialBlob=secret, Comment='Stored using tea-crypt', Persist=win32cred.CRED_PERSIST_ENTERPRISE) win32cred.CredWrite(credential, 0) def _vault_retrieve(service, ident): target = '%(ident)s@%(service)s' % vars() try: res = win32cred.CredRead(Type=win32cred.CRED_TYPE_GENERIC, TargetName=target) except pywintypes.error as e: if e.winerror == 1168 and e.funcname == 'CredRead': return None raise return res['CredentialBlob'].decode('utf-16') def _vault_delete(service, ident): target = '%(ident)s@%(service)s' % vars() win32cred.CredDelete(Type=win32cred.CRED_TYPE_GENERIC, TargetName=target) # TODO: There is a bug with saving and getting key from vault # probably something with encoding. Sometimes _vault_retrieve # returns numbers higher then 255 (even if they were not stored) @keygetter def get_key(): key = _vault_retrieve('tea', 'tea-crypt') if key: return [ord(k) % 256 for k in key] else: key = _generate_key() _vault_store('tea', 'tea-crypt', ''.join(map(chr, key))) return key except ImportError: pass elif platform.is_a(platform.DOTNET): # ironpython - .net implementation of AES can be used # ironpython can be used on mono so check what options are for key storage import clr clr.AddReference('System.Security') from System import Array, Byte from System.Text import UTF8Encoding from System.IO import MemoryStream from System.Security.Cryptography import (RijndaelManaged, CryptoStream, CryptoStreamMode, ProtectedData, DataProtectionScope) @encrypter('dotnet') def _dotnet_encrypt(text, key): """Performs crypting of provided text using AES algorithm. If 'digest' is True hex_digest will be returned, otherwise bytes of encrypted data will be returned. This function is symmetrical with decrypt function. """ utfEncoder = UTF8Encoding() bytes_text = utfEncoder.GetBytes(text) rm = RijndaelManaged() key = Array[Byte](key) enc_transform = rm.CreateEncryptor(key, Array[Byte](_vector)) mem = MemoryStream() cs = CryptoStream(mem, enc_transform, CryptoStreamMode.Write) cs.Write(bytes_text, 0, len(bytes_text)) cs.FlushFinalBlock() mem.Position = 0 encrypted = Array.CreateInstance(Byte, mem.Length) mem.Read(encrypted, 0, mem.Length) cs.Close() return ''.join(map(chr, encrypted)) @decrypter('dotnet') def _dotnet_decrypt(data, key): """Performs decrypting of provided encrypted data. If 'digest' is True data must be hex digest, otherwise data should be encrypted bytes. This function is symmetrical with encrypt function. """ data = Array[Byte](map(Byte, map(ord, data))) key = Array[Byte](map(Byte, key)) rm = RijndaelManaged() dec_transform = rm.CreateDecryptor(key, Array[Byte](_vector)) mem = MemoryStream() cs = CryptoStream(mem, dec_transform, CryptoStreamMode.Write) cs.Write(data, 0, data.Length) cs.FlushFinalBlock() mem.Position = 0 decrypted = Array.CreateInstance(Byte, mem.Length) mem.Read(decrypted, 0, decrypted.Length) cs.Close() utfEncoder = UTF8Encoding() return utfEncoder.GetString(decrypted) # Can't find a way to invoke win32api from ironpython without wrapping it # to C# dll and using pInvoke. So, instead of using windows credentials to # store a key, key will be stored in file, encrypted with # ProtedtedData.Protect. @keygetter def get_key(): from tea.system import get_appdata from tea.shell import mkdir dir_path = os.path.join(get_appdata(), 'Tea') key_path = os.path.join(dir_path, 'key.bin') if os.path.exists(dir_path) and os.path.exists(key_path): with open(key_path, 'rb') as f: cr_key = Array[Byte](map(ord, f.read())) key = ProtectedData.Unprotect(cr_key, None, DataProtectionScope.CurrentUser) return [int(k, 10) for k in key] else: mkdir(dir_path) key = _generate_key() arr_key = Array[Byte](key) cr_key = ProtectedData.Protect(arr_key, None, DataProtectionScope.CurrentUser) with open(key_path, 'wb') as f: f.write(cr_key) return key elif platform.is_a(platform.POSIX): # some kind of posix OS - more detail check is needed pass elif platform.is_a(platform.DARWIN): # macOS - dunno what to do here pass else: # this is default cipher - not very secure, but at least password will not # be in plain text format. @encrypter('simple') def _simple_encrypt(data, key): key_len = len(key) encrypted = [] for i, c in enumerate(data): key_c = key[i % key_len] msg_c = ord(c) encrypted.append(chr((msg_c + key_c) % 127)) return ''.join(encrypted) @decrypter('simple') def _simple_decrypt(data, key): key_len = len(key) decrypted = [] for i, c in enumerate(data): key_c = key[i % key_len] enc_c = ord(c) decrypted.append(chr((enc_c - key_c) % 127)) return ''.join(decrypted) def get_best_algorithm(): for alg in algorithms: if (alg in implementations['encryption'] and alg in implementations['decryption']): return alg raise CryptError('No crypting algorithms suitable for %s platform' % platform) # public interface
[docs]def encrypt(data, digest=True): """Performs encryption of provided data.""" alg = get_best_algorithm() enc = implementations['encryption'][alg](data, implementations['get_key']()) return '%s$%s' % (alg, (_to_hex_digest(enc) if digest else enc))
[docs]def decrypt(data, digest=True): """Decrypts provided data.""" alg, _, data = data.rpartition('$') if not alg: return data data = _from_hex_digest(data) if digest else data try: return implementations['decryption'][alg](data, implementations['get_key']()) except KeyError: raise CryptError('Can not decrypt key for algorithm: %s' % alg)
def main(): print(implementations) print(implementations['get_key'].__doc__) print(implementations['get_key']()) print(encrypt('password for encryption')) print(decrypt(encrypt('password for encryption'))) if __name__ == '__main__': main()