"""Functions for reading to certain bytes, e.g. delimiters."""
# Copyright (c) 2009-2012 Darcy Mason
# This file is part of pydicom, released under a modified MIT license.
# See the file LICENSE included with this distribution, also
# available at https://github.com/pydicom/pydicom
from struct import pack, unpack
from pydicom.misc import size_in_bytes
from pydicom.tag import TupleTag, Tag
from pydicom.datadict import dictionary_description
from pydicom.config import logger
[docs]def absorb_delimiter_item(fp, is_little_endian, delimiter):
"""Read (and ignore) undefined length sequence or item terminators."""
if is_little_endian:
struct_format = "<HHL"
else:
struct_format = ">HHL"
group, elem, length = unpack(struct_format, fp.read(8))
tag = TupleTag((group, elem))
if tag != delimiter:
msg = ("Did not find expected delimiter '%s'" %
dictionary_description(delimiter))
msg += ", instead found %s at file position 0x%x" % (
str(tag), fp.tell() - 8)
logger.warn(msg)
fp.seek(fp.tell() - 8)
return
logger.debug("%04x: Found Delimiter '%s'", fp.tell() - 8,
dictionary_description(delimiter))
if length == 0:
logger.debug("%04x: Read 0 bytes after delimiter", fp.tell() - 4)
else:
logger.debug("%04x: Expected 0x00000000 after delimiter, found 0x%x",
fp.tell() - 4, length)
[docs]def find_bytes(fp, bytes_to_find, read_size=128, rewind=True):
"""Read in the file until a specific byte sequence found.
Parameters
----------
fp : file-like object
bytes_to_find : str
Contains the bytes to find. Must be in correct
endian order already.
read_size : int
Number of bytes to read at a time.
rewind : boolean
Flag to rewind file reading position.
Returns
-------
found_at : byte, None
Position where byte sequence was found, else None.
"""
data_start = fp.tell()
search_rewind = len(bytes_to_find) - 1
found = False
eof = False
while not found:
chunk_start = fp.tell()
bytes_read = fp.read(read_size)
if len(bytes_read) < read_size:
# try again - if still don't get required amount,
# this is the last block
new_bytes = fp.read(read_size - len(bytes_read))
bytes_read += new_bytes
if len(bytes_read) < read_size:
eof = True # but will still check whatever we did get
index = bytes_read.find(bytes_to_find)
if index != -1:
found = True
elif eof:
if rewind:
fp.seek(data_start)
return None
else:
# rewind a bit in case delimiter crossed read_size boundary
fp.seek(fp.tell() - search_rewind)
# if get here then have found the byte string
found_at = chunk_start + index
if rewind:
fp.seek(data_start)
else:
fp.seek(found_at + len(bytes_to_find))
return found_at
[docs]def read_undefined_length_value(fp,
is_little_endian,
delimiter_tag,
defer_size=None,
read_size=1024*8):
"""Read until the delimiter tag found and return the value;
ignore the delimiter.
On completion, the file will be set to the first byte after the delimiter
and its following four zero bytes.
Parameters
----------
fp : a file-like object
is_little_endian : boolean
True if file transfer syntax is little endian, else False.
delimiter_tag : BaseTag
tag used as and marker for reading
defer_size : int, None, optional
Size to avoid loading large elements in memory.
See ``filereader.dcmread`` for more parameter info.
read_size : int
Number of bytes to read at one time.
Returns
-------
delimiter : str, None
The file delimiter
Raises
------
EOFError
If EOF is reached before delimiter found.
"""
data_start = fp.tell()
search_rewind = 3
if is_little_endian:
bytes_format = b"<HH"
else:
bytes_format = b">HH"
bytes_to_find = pack(bytes_format, delimiter_tag.group, delimiter_tag.elem)
found = False
eof = False
value_chunks = []
defer_size = size_in_bytes(defer_size)
byte_count = 0 # for defer_size checks
while not found:
chunk_start = fp.tell()
bytes_read = fp.read(read_size)
if len(bytes_read) < read_size:
# try again - if still don't get required amount,
# this is the last block
new_bytes = fp.read(read_size - len(bytes_read))
bytes_read += new_bytes
if len(bytes_read) < read_size:
eof = True # but will still check whatever we did get
index = bytes_read.find(bytes_to_find)
if index != -1:
found = True
new_bytes = bytes_read[:index]
byte_count += len(new_bytes)
if defer_size is None or byte_count < defer_size:
value_chunks.append(bytes_read[:index])
fp.seek(chunk_start + index + 4) # rewind to end of delimiter
length = fp.read(4)
if length != b"\0\0\0\0":
msg = ("Expected 4 zero bytes after undefined length delimiter"
" at pos {0:04x}")
logger.error(msg.format(fp.tell() - 4))
elif eof:
fp.seek(data_start)
raise EOFError("End of file reached before delimiter {0!r} found".
format(delimiter_tag))
else:
# rewind a bit in case delimiter crossed read_size boundary
fp.seek(fp.tell() - search_rewind)
# accumulate the bytes read (not including the rewind)
new_bytes = bytes_read[:-search_rewind]
byte_count += len(new_bytes)
if defer_size is None or byte_count < defer_size:
value_chunks.append(new_bytes)
# if get here then have found the byte string
if defer_size is not None and byte_count >= defer_size:
return None
else:
return b"".join(value_chunks)
[docs]def find_delimiter(fp, delimiter, is_little_endian, read_size=128,
rewind=True):
"""Return file position where 4-byte delimiter is located.
Parameters
----------
delimiter :
is_little_endian : boolean
read_size : int
See ``find_bytes`` for parameter info.
rewind : boolean
Flag to rewind to initial position after searching.
Returns
-------
file position of delimiter, None
Returns None if end of file is reached without finding the delimiter.
"""
struct_format = "<H"
if not is_little_endian:
struct_format = ">H"
delimiter = Tag(delimiter)
bytes_to_find = pack(struct_format, delimiter.group) + pack(
struct_format, delimiter.elem)
return find_bytes(fp, bytes_to_find, read_size=read_size, rewind=rewind)
[docs]def length_of_undefined_length(fp,
delimiter,
is_little_endian,
read_size=128,
rewind=True):
"""Search through the file to find the delimiter and return the length
of the data element.
Parameters
----------
fp : file-like object
delimiter :
See ``find_delimiter`` for parameter info.
is_little_endian : boolean
read_size : int
See ``find_bytes`` for parameter info.
rewind : boolean
Flag to rewind to initial position after searching.
Returns
-------
length to delimiter
Notes
-----
Note the data element that the delimiter starts is not read here,
the calling routine must handle that. Delimiter must be 4 bytes long.
"""
data_start = fp.tell()
delimiter_pos = find_delimiter(
fp, delimiter, is_little_endian, rewind=rewind)
length = delimiter_pos - data_start
return length
[docs]def read_delimiter_item(fp, delimiter):
"""Read and ignore an expected delimiter.
If the delimiter is not found or correctly formed, a warning is logged.
"""
found = fp.read(4)
if found != delimiter:
logger.warn("Expected delimitor %s, got %s at file position 0x%x",
Tag(delimiter), Tag(found), fp.tell() - 4)
length = fp.read_UL()
if length != 0:
logger.warn("Expected delimiter item to have length 0, "
"got %d at file position 0x%x", length, fp.tell() - 4)