Source code for tea.msg.mail

"""Simple and complete library for sending emails"""

__author__ = 'Viktor Kerkez <alefnula@gmail.com>'
__date__ = '27 November 2009'
__copyright__ = 'Copyright (c) 2009 Viktor Kerkez'

__all__ = ['SMTPConnection', 'EmailMessage', 'EmailMultiAlternatives',
           'send_mail', 'send_mass_mail']


import os
import six
import time
import socket
import random
import smtplib
import logging
import mimetypes
from tea.utils.html import strip_tags
from tea.utils.encoding import smart_text, smart_bytes
from six.moves.email_mime_text import MIMEText
from six.moves.email_mime_base import MIMEBase
from six.moves.email_mime_multipart import MIMEMultipart
if six.PY3:
    from email import charset as Charset
    from email.header import Header
    from email.encoders import encode_base64
    from email.utils import formatdate, parseaddr, formataddr
else:
    from email import Charset
    from email.Header import Header
    from email.Encoders import encode_base64
    from email.Utils import formatdate, parseaddr, formataddr


logger = logging.getLogger(__name__)


# Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from
# some spam filters.
Charset.add_charset('utf-8', Charset.SHORTEST, Charset.QP, 'utf-8')

# Default MIME type to use on attachments (if it is not explicitly given
# and cannot be guessed).
DEFAULT_ATTACHMENT_MIME_TYPE = 'application/octet-stream'

# Default charset
DEFAULT_CHARSET = 'utf-8'


# Cache the hostname, but do it lazily: socket.getfqdn() can take a couple of
# seconds, which slows down the restart of the server.
class CachedDnsName(object):
    def __str__(self):
        return self.get_fqdn()

    def get_fqdn(self):
        if not hasattr(self, '_fqdn'):
            self._fqdn = socket.getfqdn()
        return self._fqdn


DNS_NAME = CachedDnsName()


# Copied from Python standard library, with the following modifications:
# * Used cached hostname for performance.
# * Added try/except to support lack of getpid() in Jython (#5496).
def make_msgid(idstring=None, utc=False):
    """Returns a string suitable for RFC 2822 compliant Message-ID, e.g:

    <20020201195627.33539.96671@nightshade.la.mastaler.com>

    Optional idstring if given is a string used to strengthen the
    uniqueness of the message id.
    """
    if utc:
        timestamp = time.gmtime()
    else:
        timestamp = time.localtime()
    utcdate = time.strftime('%Y%m%d%H%M%S', timestamp)
    try:
        pid = os.getpid()
    except AttributeError:
        # No getpid() in Jython, for example.
        pid = 1
    randint = random.randrange(100000)
    if idstring is None:
        idstring = ''
    else:
        idstring = '.' + idstring
    idhost = DNS_NAME
    msgid = '<%s.%s.%s%s@%s>' % (utcdate, pid, randint, idstring, idhost)
    return msgid


class BadHeaderError(ValueError):
    pass


def forbid_multi_line_headers(name, val):
    """Forbids multi-line headers, to prevent header injection."""
    val = smart_text(val)
    if '\n' in val or '\r' in val:
        raise BadHeaderError('Header values can\'t contain newlines '
                             '(got %r for header %r)' % (val, name))
    try:
        val = val.encode('ascii')
    except UnicodeEncodeError:
        if name.lower() in ('to', 'from', 'cc'):
            result = []
            for item in val.split(', '):
                nm, addr = parseaddr(item)
                nm = str(Header(nm, DEFAULT_CHARSET))
                result.append(formataddr((nm, str(addr))))
            val = ', '.join(result)
        else:
            val = Header(val, DEFAULT_CHARSET)
    else:
        if name.lower() == 'subject':
            val = Header(val)
    return name, val


class SafeMIMEText(MIMEText):
    def __setitem__(self, name, val):
        name, val = forbid_multi_line_headers(name, val)
        MIMEText.__setitem__(self, name, val)


class SafeMIMEMultipart(MIMEMultipart):
    def __setitem__(self, name, val):
        name, val = forbid_multi_line_headers(name, val)
        MIMEMultipart.__setitem__(self, name, val)


[docs]class SMTPConnection(object): """A wrapper that manages the SMTP network connection.""" def __init__(self, host=None, port=None, username=None, password=None, use_tls=None, fail_silently=False): self.host = host self.port = port self.username = username self.password = password self.use_tls = (use_tls is not None) and use_tls self.fail_silently = fail_silently self.connection = None
[docs] def open(self): """Ensures we have a connection to the email server. Returns whether or not a new connection was required (True or False). """ if self.connection: # Nothing to do if the connection is already open. return False try: # If local_hostname is not specified, socket.getfqdn() gets used. # For performance, we use the cached FQDN for local_hostname. self.connection = smtplib.SMTP(self.host, self.port, local_hostname=DNS_NAME.get_fqdn()) if self.use_tls: self.connection.ehlo() self.connection.starttls() self.connection.ehlo() if self.username and self.password: self.connection.login(self.username, self.password) return True except Exception as e: logger.error('Error trying to connect to server %s:%s: %s', self.host, self.port, e) if not self.fail_silently: raise
[docs] def close(self): """Closes the connection to the email server.""" try: try: self.connection.quit() except socket.sslerror: # This happens when calling quit() on a TLS connection # sometimes. self.connection.close() except Exception as e: logger.error('Error trying to close connection to server ' '%s:%s: %s', self.host, self.port, e) if self.fail_silently: return raise finally: self.connection = None
[docs] def send_messages(self, messages): """Sends one or more EmailMessage objects and returns the number of email messages sent. """ if not messages: return new_conn_created = self.open() if not self.connection: # We failed silently on open(). Trying to send would be pointless. return num_sent = 0 for message in messages: sent = self._send(message) if sent: num_sent += 1 if new_conn_created: self.close() return num_sent
def _send(self, message): """A helper method that does the actual sending.""" if not message.recipients(): return False try: self.connection.sendmail(message.sender, message.recipients(), message.message().as_string()) except Exception as e: logger.error('Error sending a message to server %s:%s: %s', self.host, self.port, e) if not self.fail_silently: raise return False return True
[docs]class EmailMessage(object): """A container for email information.""" content_subtype = 'plain' multipart_subtype = 'mixed' encoding = None # None => use settings default def __init__(self, subject='', body='', sender=None, to=None, cc=None, bcc=None, attachments=None, headers=None, connection=None): """Initialize a single email message (which can be sent to multiple recipients). All strings used to create the message can be unicode strings (or UTF-8 bytestrings). The SafeMIMEText class will handle any necessary encoding conversions. """ if to: if not isinstance(to, (list, tuple)): raise AssertionError('"to" argument must be a list or tuple') self.to = list(to) else: self.to = [] if cc: if not isinstance(cc, (list, tuple)): raise AssertionError('"cc" argument must be a list or tuple') self.cc = list(cc) else: self.cc = [] if bcc: if not isinstance(bcc, (list, tuple)): raise AssertionError('"bcc" argument must be a list or tuple') self.bcc = list(bcc) else: self.bcc = [] self.sender = sender self.subject = subject self.body = body self.attachments = [] for attachment in (attachments or []): if isinstance(attachment, (tuple, list)): self.attach(*attachment) else: self.attach(attachment) self.extra_headers = headers or {} self.connection = connection def get_connection(self, fail_silently=False): if not self.connection: self.connection = SMTPConnection(fail_silently=fail_silently) return self.connection def message(self): encoding = self.encoding or DEFAULT_CHARSET msg = SafeMIMEText(smart_bytes(self.body, DEFAULT_CHARSET), self.content_subtype, encoding) if self.attachments: body_msg = msg msg = SafeMIMEMultipart(_subtype=self.multipart_subtype) if self.body: msg.attach(body_msg) for attachment in self.attachments: if isinstance(attachment, MIMEBase): msg.attach(attachment) else: msg.attach(self._create_attachment(*attachment)) msg['Subject'] = self.subject msg['From'] = self.sender msg['To'] = ', '.join(self.to) msg['Cc'] = ', '.join(self.cc) # Email header names are case-insensitive (RFC 2045), so we have to # accommodate that when doing comparisons. header_names = [key.lower() for key in self.extra_headers] if 'date' not in header_names: msg['Date'] = formatdate() if 'message-id' not in header_names: msg['Message-ID'] = make_msgid() for name, value in self.extra_headers.items(): msg[name] = value return msg
[docs] def recipients(self): """Returns a list of all recipients of the email (includes direct addressees as well as Bcc entries). """ return self.to + self.cc + self.bcc
[docs] def send(self, fail_silently=False): """Sends the email message.""" return self.get_connection(fail_silently).send_messages([self])
[docs] def attach(self, filename=None, content=None, mimetype=None): """Attaches a file with the given filename and content. The filename can be omitted (useful for multipart/alternative messages) and the mimetype is guessed, if not provided. If the first parameter is a MIMEBase subclass it is inserted directly into the resulting message attachments. """ if isinstance(filename, MIMEBase): assert content is None and mimetype is None self.attachments.append(filename) elif content is None and os.path.isfile(filename): self.attach_file(filename, mimetype) else: assert content is not None self.attachments.append((filename, content, mimetype))
[docs] def attach_file(self, path, mimetype=None): """Attaches a file from the filesystem.""" filename = os.path.basename(path) content = open(path, 'rb').read() self.attach(filename, content, mimetype)
def _create_attachment(self, filename, content, mimetype=None): """Converts the filename, content, mimetype triple into a MIME attachment object. """ if mimetype is None: mimetype, _ = mimetypes.guess_type(filename) if mimetype is None: mimetype = DEFAULT_ATTACHMENT_MIME_TYPE basetype, subtype = mimetype.split('/', 1) if basetype == 'text': attachment = SafeMIMEText(smart_bytes(content, DEFAULT_CHARSET), subtype, DEFAULT_CHARSET) else: # Encode non-text attachments with base64. attachment = MIMEBase(basetype, subtype) attachment.set_payload(content) encode_base64(attachment) if filename: attachment.add_header('Content-Disposition', 'attachment', filename=filename) return attachment
[docs]class EmailMultiAlternatives(EmailMessage): """A version of EmailMessage that makes it easy to send multipart/alternative messages. For example, including text and HTML versions of the text is made easier. """ multipart_subtype = 'alternative'
[docs] def attach_alternative(self, content, mimetype=None): """Attach an alternative content representation.""" self.attach(content=content, mimetype=mimetype)
[docs]def send_mail(subject, sender, to, message, html_message=None, cc=None, bcc=None, attachments=None, host=None, port=None, auth_user=None, auth_password=None, use_tls=False, fail_silently=False): """Easy wrapper for sending a single message to a recipient list. All members of the recipient list will see the other recipients in the 'To' field. Note: The API for this method is frozen. New code wanting to extend the functionality should use the EmailMessage class directly. """ if message is None and html_message is None: raise ValueError('Either message or html_message must be provided') if message is None: message = strip_tags(html_message) connection = SMTPConnection(host=host, port=port, username=auth_user, password=auth_password, use_tls=use_tls, fail_silently=fail_silently) # Convert the to field just for easier usage if isinstance(to, six.string_types): to = [to] if html_message is None: email = EmailMessage(subject=subject, body=message, sender=sender, to=to, cc=cc, bcc=bcc, attachments=attachments, connection=connection) else: email = EmailMultiAlternatives(subject=subject, body=message, sender=sender, to=to, cc=cc, bcc=bcc, attachments=attachments, connection=connection) email.attach_alternative(html_message, 'text/html') return email.send()
[docs]def send_mass_mail(datatuple, fail_silently=False, auth_user=None, auth_password=None): """Given a datatuple of (subject, message, sender, recipient_list), sends each message to each recipient list. Returns the number of e-mails sent. If auth_user and auth_password are set, they're used to log in. Note: The API for this method is frozen. New code wanting to extend the functionality should use the EmailMessage class directly. """ connection = SMTPConnection(username=auth_user, password=auth_password, fail_silently=fail_silently) messages = [EmailMessage(subject, message, sender, recipient) for subject, message, sender, recipient in datatuple] return connection.send_messages(messages)