Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Lib/hmac.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(self, key, msg=None, digestmod=''):
raise TypeError("key: expected bytes or bytearray, but got %r" % type(key).__name__)

if not digestmod:
raise TypeError("Missing required parameter 'digestmod'.")
raise TypeError("Missing required argument 'digestmod'.")

if _hashopenssl and isinstance(digestmod, (str, _functype)):
try:
Expand Down
96 changes: 84 additions & 12 deletions Lib/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,56 @@ def get_file(self, key):
f = open(os.path.join(self._path, self._lookup(key)), 'rb')
return _ProxyFile(f)

def get_info(self, key):
"""Get the keyed message's "info" as a string."""
subpath = self._lookup(key)
if self.colon in subpath:
return subpath.split(self.colon)[-1]
return ''

def set_info(self, key, info: str):
"""Set the keyed message's "info" string."""
if not isinstance(info, str):
raise TypeError(f'info must be a string: {type(info)}')
old_subpath = self._lookup(key)
new_subpath = old_subpath.split(self.colon)[0]
if info:
new_subpath += self.colon + info
if new_subpath == old_subpath:
return
old_path = os.path.join(self._path, old_subpath)
new_path = os.path.join(self._path, new_subpath)
os.rename(old_path, new_path)
self._toc[key] = new_subpath

def get_flags(self, key):
"""Return as a string the standard flags that are set on the keyed message."""
info = self.get_info(key)
if info.startswith('2,'):
return info[2:]
return ''

def set_flags(self, key, flags: str):
"""Set the given flags and unset all others on the keyed message."""
if not isinstance(flags, str):
raise TypeError(f'flags must be a string: {type(flags)}')
# TODO: check if flags are valid standard flag characters?
self.set_info(key, '2,' + ''.join(sorted(set(flags))))

def add_flag(self, key, flag: str):
"""Set the given flag(s) without changing others on the keyed message."""
if not isinstance(flag, str):
raise TypeError(f'flag must be a string: {type(flag)}')
# TODO: check that flag is a valid standard flag character?
self.set_flags(key, ''.join(set(self.get_flags(key)) | set(flag)))

def remove_flag(self, key, flag: str):
"""Unset the given string flag(s) without changing others on the keyed message."""
if not isinstance(flag, str):
raise TypeError(f'flag must be a string: {type(flag)}')
if self.get_flags(key):
self.set_flags(key, ''.join(set(self.get_flags(key)) - set(flag)))

def iterkeys(self):
"""Return an iterator over keys."""
self._refresh()
Expand Down Expand Up @@ -540,6 +590,8 @@ def _refresh(self):
for subdir in self._toc_mtimes:
path = self._paths[subdir]
for entry in os.listdir(path):
if entry.startswith('.'):
continue
p = os.path.join(path, entry)
if os.path.isdir(p):
continue
Expand Down Expand Up @@ -698,9 +750,13 @@ def flush(self):
_sync_close(new_file)
# self._file is about to get replaced, so no need to sync.
self._file.close()
# Make sure the new file's mode is the same as the old file's
mode = os.stat(self._path).st_mode
os.chmod(new_file.name, mode)
# Make sure the new file's mode and owner are the same as the old file's
info = os.stat(self._path)
os.chmod(new_file.name, info.st_mode)
try:
os.chown(new_file.name, info.st_uid, info.st_gid)
except (AttributeError, OSError):
pass
try:
os.rename(new_file.name, self._path)
except FileExistsError:
Expand Down Expand Up @@ -778,10 +834,11 @@ def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.seek(start)
from_line = self._file.readline().replace(linesep, b'')
from_line = self._file.readline().replace(linesep, b'').decode('ascii')
string = self._file.read(stop - self._file.tell())
msg = self._message_factory(string.replace(linesep, b'\n'))
msg.set_from(from_line[5:].decode('ascii'))
msg.set_unixfrom(from_line)
msg.set_from(from_line[5:])
return msg

def get_string(self, key, from_=False):
Expand Down Expand Up @@ -1089,10 +1146,24 @@ def __len__(self):
"""Return a count of messages in the mailbox."""
return len(list(self.iterkeys()))

def _open_mh_sequences_file(self, text):
mode = '' if text else 'b'
kwargs = {'encoding': 'ASCII'} if text else {}
path = os.path.join(self._path, '.mh_sequences')
while True:
try:
return open(path, 'r+' + mode, **kwargs)
except FileNotFoundError:
pass
try:
return open(path, 'x+' + mode, **kwargs)
except FileExistsError:
pass

def lock(self):
"""Lock the mailbox."""
if not self._locked:
self._file = open(os.path.join(self._path, '.mh_sequences'), 'rb+')
self._file = self._open_mh_sequences_file(text=False)
_lock_file(self._file)
self._locked = True

Expand Down Expand Up @@ -1146,7 +1217,11 @@ def remove_folder(self, folder):
def get_sequences(self):
"""Return a name-to-key-list dictionary to define each sequence."""
results = {}
with open(os.path.join(self._path, '.mh_sequences'), 'r', encoding='ASCII') as f:
try:
f = open(os.path.join(self._path, '.mh_sequences'), 'r', encoding='ASCII')
except FileNotFoundError:
return results
with f:
all_keys = set(self.keys())
for line in f:
try:
Expand All @@ -1169,7 +1244,7 @@ def get_sequences(self):

def set_sequences(self, sequences):
"""Set sequences using the given name-to-key-list dictionary."""
f = open(os.path.join(self._path, '.mh_sequences'), 'r+', encoding='ASCII')
f = self._open_mh_sequences_file(text=True)
try:
os.close(os.open(f.name, os.O_WRONLY | os.O_TRUNC))
for name, keys in sequences.items():
Expand Down Expand Up @@ -1956,10 +2031,7 @@ def readlines(self, sizehint=None):

def __iter__(self):
"""Iterate over lines."""
while True:
line = self.readline()
if not line:
return
while line := self.readline():
yield line

def tell(self):
Expand Down
8 changes: 8 additions & 0 deletions Lib/test/test_hmac.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,14 @@ def test_exercise_all_methods(self):
self.fail("Exception raised during normal usage of HMAC class.")


class UpdateTestCase(unittest.TestCase):
@hashlib_helper.requires_hashdigest('sha256')
def test_with_str_update(self):
with self.assertRaises(TypeError):
h = hmac.new(b"key", digestmod='sha256')
h.update("invalid update")


class CopyTestCase(unittest.TestCase):

@hashlib_helper.requires_hashdigest('sha256')
Expand Down
Loading
Loading