"""
ACSE service provider
"""
import logging
from pydicom.uid import UID
from pynetdicom.pdu_primitives import (
A_ASSOCIATE, A_RELEASE, A_ABORT, A_P_ABORT,
AsynchronousOperationsWindowNegotiation,
MaximumLengthNotification,
SOPClassCommonExtendedNegotiation,
SOPClassExtendedNegotiation,
UserIdentityNegotiation,
)
from pynetdicom.presentation import (
negotiate_as_requestor, negotiate_as_acceptor
)
from pynetdicom.utils import pretty_bytes
LOGGER = logging.getLogger('pynetdicom.acse')
# DICOM Application Context Name - see Part 7, Annex A.2.1
APPLICATION_CONTEXT_NAME = UID('1.2.840.10008.3.1.1.1')
[docs]class ACSE(object):
"""The Association Control Service Element (ACSE) service provider.
The ACSE protocol handles association establishment, normal release of an
association and the abnormal release of an association.
Attributes
----------
acse_timeout : int
The maximum time (in seconds) to wait for association related PDUs
from the peer.
"""
[docs] def __init__(self, acse_timeout=30):
"""Create the ACSE service provider.
Parameters
----------
acse_timeout : int, optional
The maximum time (in seconds) to wait for A-ASSOCIATE related PDUs
from the peer (default: 30)
"""
self.acse_timeout = acse_timeout
@staticmethod
def _check_async_ops(assoc):
"""Check the user's response to an Asynchronous Operations request.
Parameters
----------
assoc : association.Association
The Association instance that received the Asynchronous Operations
Window Negotiation item in an A-ASSOCIATE (request) primitive.
Returns
-------
pdu_primitives.AsynchronousOperationsWindowNegotiation or None
If the `AE.on_async_ops_window` callback hasn't been implemented
then returns None, otherwise returns an
AsynchronousOperationsWindowNegotiation item with the default
values for the number of operations invoked/performed (1, 1).
"""
# pylint: disable=broad-except
try:
# Response is always ignored as async ops is not supported
_ = assoc.ae.on_async_ops_window(
*assoc.requestor.asynchronous_operations
)
except NotImplementedError:
return None
except Exception as exc:
LOGGER.error(
"Exception raised in user's 'on_async_ops_window' "
"implementation"
)
LOGGER.exception(exc)
item = AsynchronousOperationsWindowNegotiation()
item.maximum_number_operations_invoked = 1
item.maximum_number_operations_performed = 1
return item
@staticmethod
def _check_sop_class_common_extended(assoc):
"""Check the user's response to a SOP Class Common Extended request.
Parameters
----------
assoc : association.Association
The Association instance that received one or more SOP Class
Common Extended Negotiation items in an A-ASSOCIATE (request)
primitive.
Returns
-------
dict
The {SOP Class UID : SOPClassCommonExtendedNegotiation} items for
the accepted SOP Class Common Extended negotiation items.
"""
# pylint: disable=broad-except
try:
rsp = assoc.ae.on_sop_class_common_extended(
assoc.requestor.sop_class_common_extended
)
except Exception as exc:
LOGGER.error(
"Exception raised in user's 'on_sop_class_common_extended' "
"implementation"
)
LOGGER.exception(exc)
return {}
rsp = {
uid:ii for uid,ii in rsp.items()
if isinstance(ii, SOPClassCommonExtendedNegotiation)
}
return rsp
@staticmethod
def _check_sop_class_extended(assoc):
"""Check the user's response to a SOP Class Extended request.
Parameters
----------
assoc : association.Association
The Association instance that received one or more SOP Class
Extended Negotiation items in an A-ASSOCIATE (request) primitive.
Returns
-------
list of pdu_primitives.SOPClassExtendedNegotiation
The SOP Class Extended Negotiation items to be sent in response
"""
# pylint: disable=broad-except
try:
user_response = assoc.ae.on_sop_class_extended(
assoc.requestor.sop_class_extended
)
except Exception as exc:
user_response = None
LOGGER.error(
"Exception raised in user's 'on_sop_class_extended' "
"implementation"
)
LOGGER.exception(exc)
if not isinstance(user_response, (type(None), dict)):
LOGGER.error(
"Invalid type returned by user's 'on_sop_class_extended' "
"implementation"
)
user_response = None
if user_response is None:
return []
items = []
for sop_class, app_info in user_response.items():
try:
item = SOPClassExtendedNegotiation()
item.sop_class_uid = sop_class
item.service_class_application_information = app_info
items.append(item)
except Exception as exc:
LOGGER.error(
"Unable to set the SOP Class Extended Negotiation "
"response values for the SOP Class UID {}"
.format(sop_class)
)
LOGGER.exception(exc)
return items
@staticmethod
def _check_user_identity(assoc):
"""Check the user's response to a User Identity request.
Parameters
----------
assoc : association.Association
The Association instance that received the User Identity
Negotiation item in an A-ASSOCIATE (request) primitive.
Returns
-------
bool
True if the user identity has been confirmed, False otherwise.
pdu_primitives.UserIdentityNegotiation or None
The negotiation response, if a positive response is requested,
otherwise None.
"""
# pylint: disable=broad-except
# The UserIdentityNegotiation (request) item
req = assoc.requestor.user_identity
try:
identity_verified, response = assoc.ae.on_user_identity(
req.user_identity_type,
req.primary_field,
req.secondary_field,
{
'requestor' : assoc.requestor.info,
}
)
except NotImplementedError:
# If the user hasn't implemented identity negotiation then
# default to accepting the association
return True, None
except Exception as exc:
# If the user has implemented identity negotiation but an exception
# occurred then reject the association
LOGGER.error("Exception in handling user identity negotiation")
LOGGER.exception(exc)
return False, None
if not identity_verified:
# Reject association as the user isn't authorised
return False, None
if req.user_identity_type in [3, 4, 5]:
if req.positive_response_requested and response is not None:
try:
rsp = UserIdentityNegotiation()
rsp.server_response = response
return True, rsp
except Exception as exc:
# > If the acceptor doesn't support user identification it
# > will accept the association without making a positive
# > response
LOGGER.error(
"Unable to set the User Identity Negotiation's "
"'server_response'"
)
LOGGER.exception(exc)
return True, None
return True, None
[docs] @staticmethod
def is_aborted(assoc):
"""Return True if an A-ABORT or A-P-ABORT request has been received."""
primitive = assoc.dul.peek_next_pdu()
if primitive.__class__ in (A_ABORT, A_P_ABORT):
return True
return False
[docs] @staticmethod
def is_released(assoc):
"""Return True if an A-RELEASE request has been received."""
primitive = assoc.dul.peek_next_pdu()
if isinstance(primitive, A_RELEASE):
# Make sure this is an A-RELEASE *request* primitive
# response primitives have the Result field as 'affirmative'
if primitive.result == 'affirmative':
return False
return True
return False
[docs] def negotiate_association(self, assoc):
"""Perform an association negotiation as either the requestor or
acceptor.
Parameters
----------
assoc : association.Association
The Association instance to perform the negotiation for.
"""
if assoc.is_requestor:
self._negotiate_as_requestor(assoc)
elif assoc.is_acceptor:
self._negotiate_as_acceptor(assoc)
def _negotiate_as_acceptor(self, assoc):
"""Perform an association negotiation as the association acceptor.
Parameters
----------
assoc : association.Association
The Association instance to perform the negotiation for.
"""
# For convenience
assoc_rq = assoc.requestor.primitive
assoc.requestor.ae_title = assoc_rq.calling_ae_title
# If we reject association -> [result, source, diagnostic]
reject_assoc_rsd = []
# Calling AE Title not recognised
if (assoc.ae.require_calling_aet != b''
and assoc.ae.require_calling_aet != assoc_rq.calling_ae_title):
reject_assoc_rsd = [0x01, 0x01, 0x03]
# Called AE Title not recognised
if (assoc.ae.require_called_aet != b''
and assoc.ae.require_called_aet != assoc_rq.called_ae_title):
reject_assoc_rsd = [0x01, 0x01, 0x07]
## Extended Negotiation items
# User Identity Negotiation items
if assoc.requestor.user_identity:
is_valid, id_response = self._check_user_identity(assoc)
if not is_valid:
# Transient, ACSE related, no reason given
LOGGER.info("User identity failed verification")
reject_assoc_rsd = [0x02, 0x02, 0x01]
if id_response:
# Add the User Identity Negotiation (response) item
assoc.acceptor.add_negotiation_item(id_response)
# SOP Class Extended Negotiation items
for item in self._check_sop_class_extended(assoc):
assoc.acceptor.add_negotiation_item(item)
# SOP Class Common Extended Negotiation items
# Note: No response items are allowed
assoc.acceptor._common_ext = (
self._check_sop_class_common_extended(assoc)
)
# Asynchronous Operations Window Negotiation items
if assoc.requestor.asynchronous_operations != (1, 1):
async_rsp = self._check_async_ops(assoc)
# Add any Async Ops (response) item
if async_rsp:
assoc.acceptor.add_negotiation_item(async_rsp)
## DUL Presentation Related Rejections
# Maximum number of associations reached (local-limit-exceeded)
if len(assoc.ae.active_associations) > assoc.ae.maximum_associations:
reject_assoc_rsd = [0x02, 0x03, 0x02]
if reject_assoc_rsd:
# pylint: disable=no-value-for-parameter
self.send_reject(assoc, *reject_assoc_rsd)
assoc.debug_association_rejected(assoc.acceptor.primitive)
assoc.ae.on_association_rejected(assoc.acceptor.primitive)
assoc.kill()
return
## Negotiate Presentation Contexts
# SCP/SCU Role Selection Negotiation request items
# {SOP Class UID : (SCU role, SCP role)}
rq_roles = {
uid:(item.scu_role, item.scp_role)
for uid, item in assoc.requestor.role_selection.items()
}
result, ac_roles = negotiate_as_acceptor(
assoc_rq.presentation_context_definition_list,
assoc.acceptor.supported_contexts,
rq_roles
)
# pylint: disable=protected-access
assoc._accepted_cx = [cx for cx in result if cx.result == 0x00]
assoc._rejected_cx = [cx for cx in result if cx.result != 0x00]
# pylint: enable=protected-access
# Add any SCP/SCU Role Selection Negotiation response items
for item in ac_roles:
assoc.acceptor.add_negotiation_item(item)
# Set maximum PDU send length
# Unlimited PDU size - set to 64K as this is big enough to max
# out most protocols
# FIXME: unlimited length should be handled correctly
if assoc.requestor.maximum_length == 0:
for item in assoc.requestor.user_information:
if isinstance(item, MaximumLengthNotification):
item.maximum_length_received = 0x10000
break
# Send the A-ASSOCIATE (accept) primitive
self.send_accept(assoc)
# Callbacks/Logging
assoc.debug_association_accepted(assoc.acceptor.primitive)
assoc.ae.on_association_accepted(assoc.acceptor.primitive)
# No valid presentation contexts, abort the association
if not assoc.accepted_contexts:
self.send_abort(assoc, 0x02)
assoc.kill()
return
# Assocation established OK
assoc.is_established = True
def _negotiate_as_requestor(self, assoc):
"""Perform an association negotiation as the association requestor.
Parameters
----------
assoc : association.Association
The Association instance to perform the negotiation for.
"""
if not assoc.requestor.requested_contexts:
LOGGER.error(
"One or more requested presentation contexts must be set "
"prior to association negotiation"
)
assoc.kill()
return
# Build and send an A-ASSOCIATE (request) PDU to the peer
self.send_request(assoc)
# Wait for response
rsp = assoc.dul.receive_pdu(wait=True, timeout=assoc.acse_timeout)
# Association accepted or rejected
if isinstance(rsp, A_ASSOCIATE):
assoc.acceptor.primitive = rsp
# Accepted
if rsp.result == 0x00:
## Handle SCP/SCU Role Selection response
# Apply requestor's proposed SCP/SCU role selection (if any)
# to the requested contexts
rq_roles = {
uid:(ii.scu_role, ii.scp_role)
for uid, ii in assoc.requestor.role_selection.items()
}
if rq_roles:
for cx in assoc.requestor.requested_contexts:
try:
(cx.scu_role, cx.scp_role) = rq_roles[
cx.abstract_syntax
]
except KeyError:
pass
# Collate the acceptor's SCP/SCU role selection responses
ac_roles = {
uid:(ii.scu_role, ii.scp_role)
for uid, ii in assoc.acceptor.role_selection.items()
}
# Check the negotiated presentation contexts results and
# determine their agreed upon SCP/SCU roles
negotiated_contexts = negotiate_as_requestor(
assoc.requestor.requested_contexts,
rsp.presentation_context_definition_results_list,
ac_roles
)
# pylint: disable=protected-access
assoc._accepted_cx = [
cx for cx in negotiated_contexts if cx.result == 0x00
]
assoc._rejected_cx = [
cx for cx in negotiated_contexts if cx.result != 0x00
]
# pylint: enable=protected-access
assoc.debug_association_accepted(rsp)
assoc.ae.on_association_accepted(rsp)
# No acceptable presentation contexts
if not assoc.accepted_contexts:
LOGGER.error("No accepted presentation contexts")
self.send_abort(assoc, 0x02)
assoc.is_aborted = True
assoc.is_established = False
assoc.kill()
else:
assoc.is_established = True
elif rsp.result in [0x01, 0x02]:
# 0x01 is rejected (permanent)
# 0x02 is rejected (transient)
assoc.ae.on_association_rejected(rsp)
assoc.debug_association_rejected(rsp)
assoc.is_rejected = True
assoc.is_established = False
assoc.dul.kill_dul()
else:
LOGGER.error(
"Received an invalid A-ASSOCIATE 'Result' value from "
"the peer: '0x{:02x}'".format(rsp.result)
)
self.send_abort(assoc, 0x02)
assoc.is_aborted = True
assoc.is_established = False
assoc.kill()
# Association aborted
elif isinstance(rsp, (A_ABORT, A_P_ABORT)):
assoc.ae.on_association_aborted(rsp)
assoc.debug_association_aborted(rsp)
assoc.is_established = False
assoc.is_aborted = True
assoc.dul.kill_dul()
else:
assoc.is_established = False
assoc.dul.kill_dul()
LOGGER.error(
"Received an invalid response to the A-ASSOCIATE request"
)
[docs] def release_association(self, assoc):
"""Release an established association.
Sends an A-RELEASE request and waits for the response. If no response
is received then aborts the association instead.
Parameters
----------
assoc : association.Association
The Association instance to release.
"""
self.send_release(assoc, is_response=False)
rsp = assoc.dul.receive_pdu(wait=True, timeout=self.acse_timeout)
try:
assert rsp.result == 'affirmative'
assoc.is_released = True
assoc.is_established = False
assoc.kill()
except (AttributeError, AssertionError):
LOGGER.error(
"Received an invalid response to the A-RELEASE request"
)
assoc.abort()
[docs] @staticmethod
def send_abort(assoc, source):
"""Send an A-ABORT request to the peer.
Parameters
----------
assoc : pynetdicom.association.Association
The association that is sending the A-ABORT.
source : int
The source of the abort request
- 0x00 - the DUL service user
- 0x02 - the DUL service provider
Raises
------
ValueError
If the `source` value is invalid.
"""
if source not in [0x00, 0x02]:
raise ValueError("Invalid 'source' parameter value")
# The following parameters must be set for an A-ABORT primitive
# (* sent in A-ABORT PDU):
# Abort Source*
# Provider Reason* (not significant with source 0x00)
primitive = A_ABORT()
primitive.abort_source = source
assoc.dul.send_pdu(primitive)
assoc.is_aborted = True
assoc.is_established = False
[docs] @staticmethod
def send_accept(assoc):
"""Send an A-ASSOCIATE (accept) to the peer.
Parameters
----------
assoc : pynetdicom.association.Association
The association that is sending the A-ASSOCIATE (accept).
"""
# The following parameters must be set for an A-ASSOCIATE (accept)
# primitive (* sent in A-ASSOCIATE-AC PDU):
# Application Context Name*
# Calling AE Title* (but not to be tested)
# Called AE Title* (but not to be tested)
# User Information
# Maximum PDV Length*
# Implementation Class UID*
# Result
# Result Source
# Presentation Context Definition List Result*
primitive = A_ASSOCIATE()
primitive.application_context_name = APPLICATION_CONTEXT_NAME
primitive.calling_ae_title = assoc.requestor.primitive.calling_ae_title
primitive.called_ae_title = assoc.requestor.primitive.called_ae_title
primitive.result = 0x00
primitive.result_source = 0x01
primitive.presentation_context_definition_results_list = (
assoc.accepted_contexts
)
## User Information - PS3.7 Annex D.3.3
primitive.user_information = assoc.acceptor.user_information
assoc.acceptor.primitive = primitive
assoc.dul.send_pdu(primitive)
[docs] @staticmethod
def send_ap_abort(assoc, reason):
"""Send an A-P-ABORT to the peer.
Parameters
----------
assoc : pynetdicom.association.Association
The association that is sending the A-P-ABORT.
reason : int
The reason for aborting the association, one of the following:
- 0x00 - reason not specified
- 0x01 - unrecognised PDU
- 0x02 - unexpected PDU
- 0x04 - unrecognised PDU parameter
- 0x05 - unexpected PDU parameter
- 0x06 - invalid PDU parameter value
Raises
------
ValueError
If the `reason` value is invalid.
"""
if reason not in [0x00, 0x01, 0x02, 0x04, 0x05, 0x06]:
raise ValueError("Invalid 'reason' parameter value")
# The following parameters must be set for an A-P-ABORT primitive
# (* sent in A-ABORT PDU):
# Abort Source* (always 0x02)
# Provider Reason*
primitive = A_P_ABORT()
primitive.provider_reason = reason
assoc.dul.send_pdu(primitive)
assoc.is_aborted = True
assoc.is_established = False
[docs] @staticmethod
def send_reject(assoc, result, source, diagnostic):
"""Send an A-ASSOCIATE (reject) to the peer.
Parameters
----------
assoc : pynetdicom.association.Association
The association that is sending the A-ASSOCIATE (reject).
result : int
The association rejection:
- 0x01 - rejected permanent
- 0x02 - rejected transient
source : int
The source of the rejection:
- 0x01 - DUL service user
- 0x02 - DUL service provider (ACSE related)
- 0x03 - DUL service provider (presentation related)
diagnostic : int
The reason for the rejection, if the source is 0x01:
- 0x01 - no reason given
- 0x02 - application context name not supported
- 0x03 - calling AE title not recognised
- 0x07 - called AE title not recognised
If the source is 0x02:
- 0x01 - no reason given
- 0x02 - protocol version not supported
If the source is 0x03:
- 0x01 - temporary congestion
- 0x02 - local limit exceeded
"""
if result not in [0x01, 0x02]:
raise ValueError("Invalid 'result' parameter value")
_valid_reason_diagnostic = {
0x01 : [0x01, 0x02, 0x03, 0x07],
0x02 : [0x01, 0x02],
0x03 : [0x01, 0x02],
}
try:
if diagnostic not in _valid_reason_diagnostic[source]:
raise ValueError(
"Invalid 'diagnostic' parameter value"
)
except KeyError:
raise ValueError("Invalid 'source' parameter value")
# The following parameters must be set for an A-ASSOCIATE (reject)
# primitive (* sent in A-ASSOCIATE-RJ PDU):
# Result*
# Result Source*
# Diagnostic*
primitive = A_ASSOCIATE()
primitive.result = result
primitive.result_source = source
primitive.diagnostic = diagnostic
assoc.acceptor.primitive = primitive
assoc.dul.send_pdu(primitive)
assoc.is_rejected = True
assoc.is_established = False
[docs] @staticmethod
def send_release(assoc, is_response=False):
"""Send an A-RELEASE (request or response) to the peer.
Parameters
----------
assoc : pynetdicom.association.Association
The association that is sending the A-RELEASE.
is_response : bool, optional
True to send an A-RELEASE (response) to the peer, False
to send an A-RELEASE (request) to the peer (default).
"""
primitive = A_RELEASE()
if is_response:
primitive.result = "affirmative"
assoc.dul.send_pdu(primitive)
[docs] @staticmethod
def send_request(assoc):
"""Send an A-ASSOCIATE (request) to the peer.
Parameters
----------
assoc : pynetdicom.association.Association
The association that is sending the A-ASSOCIATE (request).
"""
# The following parameters must be set for a request primitive
# (* sent in A-ASSOCIATE-RQ PDU)
# Application Context Name*
# Calling AE Title*
# Called AE Title*
# UserInformation*
# Maximum PDV Length*
# Implementation Class UID*
# Calling Presentation Address
# Called Presentation Address
# Presentation Context Definition List*
primitive = A_ASSOCIATE()
# DICOM Application Context Name, see PS3.7 Annex A.2.1
primitive.application_context_name = APPLICATION_CONTEXT_NAME
# Calling AE Title is the source DICOM AE title
primitive.calling_ae_title = assoc.requestor.ae_title
# Called AE Title is the destination DICOM AE title
primitive.called_ae_title = assoc.acceptor.ae_title
# The TCP/IP address of the source, pynetdicom includes port too
primitive.calling_presentation_address = (
assoc.requestor.address, assoc.requestor.port
)
# The TCP/IP address of the destination, pynetdicom includes port too
primitive.called_presentation_address = (
assoc.acceptor.address, assoc.acceptor.port
)
# Proposed presentation contexts
primitive.presentation_context_definition_list = (
assoc.requestor.requested_contexts
)
## User Information - PS3.7 Annex D.3.3
# Mandatory items:
# Maximum Length Notification (1)
# Implementation Class UID Notification (1)
# Optional notification items:
# Implementation Version Name Notification (0 or 1)
# Optional negotiation items:
# SCP/SCU Role Selection Negotiation (0 or N)
# Asynchronous Operations Window Negotiation (0 or 1)
# SOP Class Extended Negotiation (0 or N)
# SOP Class Common Extended Negotiation (0 or N)
# User Identity Negotiation (0 or 1)
primitive.user_information = assoc.requestor.user_information
# Save the request primitive
assoc.requestor.primitive = primitive
# Send the A-ASSOCIATE request primitive to the peer
LOGGER.info("Requesting Association")
assoc.dul.send_pdu(primitive)
# ACSE logging/debugging functions
[docs] @staticmethod
def debug_send_abort(a_abort_rq):
"""
Placeholder for a function callback. Function will be called
immediately prior to encoding and sending an A-ABORT to a peer AE
Parameters
----------
a_abort : pdu.A_ABORT_RQ
The A-ABORT PDU instance
"""
LOGGER.info('Aborting Association')
[docs] @staticmethod
def debug_send_associate_ac(a_associate_ac):
"""
Placeholder for a function callback. Function will be called
immediately prior to encoding and sending an A-ASSOCIATE-AC to a peer AE
Parameters
----------
a_associate_ac : pdu.A_ASSOCIATE_AC
The A-ASSOCIATE-AC PDU instance
"""
LOGGER.info("Association Accepted")
# Shorthand
assoc_ac = a_associate_ac
# Needs some cleanup
app_context = assoc_ac.application_context_name.title()
pres_contexts = assoc_ac.presentation_context
user_info = assoc_ac.user_information
async_ops = user_info.async_ops_window
roles = user_info.role_selection
responding_ae = 'resp. AE Title'
s = ['Accept Parameters:']
s.append('====================== BEGIN A-ASSOCIATE-AC ================'
'=====')
s.append('Our Implementation Class UID: '
'{0!s}'.format(user_info.implementation_class_uid))
if user_info.implementation_version_name:
s.append(
"Our Implementation Version Name: {0!s}".format(
user_info.implementation_version_name.decode('ascii')
)
)
s.append('Application Context Name: {0!s}'.format(app_context))
s.append('Responding Application Name: {0!s}'.format(responding_ae))
s.append('Our Max PDU Receive Size: '
'{0!s}'.format(user_info.maximum_length))
s.append('Presentation Contexts:')
if not pres_contexts:
s.append(' (no valid presentation contexts)')
# Sort by context ID
for item in sorted(pres_contexts, key=lambda x: x.context_id):
s.append(' Context ID: {0!s} ({1!s})'
.format(item.context_id, item.result_str))
# If Presentation Context was accepted
if item.result == 0:
#if item.scu_role is None and item.scp_role is None:
# ac_scp_scu_role = 'Default'
#else:
# ac_scp_scu_role = '{0!s}/{1!s}'.format(item.scp_role,
#item.scu_role)
#s.append(' Accepted SCP/SCU Role: {0!s}'
# .format(ac_scp_scu_role))
s.append(' Accepted Transfer Syntax: ={0!s}'
.format(item.transfer_syntax.name))
## Role Selection
if roles:
s.append("Accepted Role Selection:")
for uid in sorted(roles.keys()):
s.append(" SOP Class: ={}".format(uid.name))
str_roles = []
if roles[uid].scp_role:
str_roles.append('SCP')
if roles[uid].scu_role:
str_roles.append('SCU')
str_roles = '/'.join(str_roles)
s.append(" SCP/SCU Role: {}".format(str_roles))
## Extended Negotiation
if user_info.ext_neg:
s.append('Accepted Extended Negotiation:')
for item in user_info.ext_neg:
s.append(' SOP Class: ={0!s}'.format(item.uid))
app_info = pretty_bytes(item.app_info)
app_info[0] = '[' + app_info[0][1:]
app_info[-1] = app_info[-1] + ' ]'
for line in app_info:
s.append(' {0!s}'.format(line))
else:
s.append('Accepted Extended Negotiation: None')
## Asynchronous Operations
if async_ops:
s.append(
"Accepted Asynchronous Operations Window Negotiation:"
)
s.append(
" Maximum Invoked Operations: {}"
.format(async_ops.maximum_number_operations_invoked)
)
s.append(
" Maximum Performed Operations: {}"
.format(async_ops.maximum_number_operations_performed)
)
else:
s.append(
"Accepted Asynchronous Operations Window Negotiation: None"
)
## User Identity Negotiation
usr_id = 'Yes' if user_info.user_identity is not None else 'None'
s.append('User Identity Negotiation Response: {0!s}'.format(usr_id))
s.append('======================= END A-ASSOCIATE-AC =================='
'====')
for line in s:
LOGGER.debug(line)
[docs] @staticmethod
def debug_send_associate_rj(a_associate_rj):
"""
Placeholder for a function callback. Function will be called
immediately prior to encoding and sending an A-ASSOCIATE-RJ to a peer AE
Parameters
----------
a_associate_rj : pdu.A_ASSOCIATE_RJ
The A-ASSOCIATE-RJ PDU instance
"""
LOGGER.info("Association Rejected")
[docs] @staticmethod
def debug_send_associate_rq(a_associate_rq):
"""
Placeholder for a function callback. Function will be called
immediately prior to encoding and sending an A-ASSOCIATE-RQ to
a peer AE
The default implementation is used for logging debugging information
Parameters
----------
a_associate_rq : pdu.A_ASSOCIATE_RQ
The A-ASSOCIATE-RQ PDU instance to be encoded and sent
"""
# Shorthand
pdu = a_associate_rq
app_context = pdu.application_context_name.title()
pres_contexts = pdu.presentation_context
user_info = pdu.user_information
s = ['Request Parameters:']
s.append('====================== BEGIN A-ASSOCIATE-RQ ================'
'=====')
s.append('Our Implementation Class UID: '
'{0!s}'.format(user_info.implementation_class_uid))
if user_info.implementation_version_name:
s.append(
'Our Implementation Version Name: {0!s}'.format(
user_info.implementation_version_name.decode('ascii')
)
)
s.append('Application Context Name: {0!s}'.format(app_context))
s.append('Calling Application Name: '
'{0!s}'.format(pdu.calling_ae_title.decode('ascii')))
s.append('Called Application Name: '
'{0!s}'.format(pdu.called_ae_title.decode('ascii')))
s.append('Our Max PDU Receive Size: '
'{0!s}'.format(user_info.maximum_length))
## Presentation Contexts
if len(pres_contexts) == 1:
s.append('Presentation Context:')
else:
s.append('Presentation Contexts:')
for context in pres_contexts:
s.append(' Context ID: {0!s} '
'(Proposed)'.format((context.context_id)))
s.append(' Abstract Syntax: ='
'{0!s}'.format(context.abstract_syntax.name))
# Add SCP/SCU Role Selection Negotiation
# Roles are: SCU, SCP/SCU, SCP, Default
if pdu.user_information.role_selection:
try:
role = pdu.user_information.role_selection[
context.abstract_syntax
]
roles = []
if role.scp_role:
roles.append('SCP')
if role.scu_role:
roles.append('SCU')
scp_scu_role = '/'.join(roles)
except KeyError:
scp_scu_role = 'Default'
else:
scp_scu_role = 'Default'
s.append(' Proposed SCP/SCU Role: {0!s}'.format(scp_scu_role))
# Transfer Syntaxes
if len(context.transfer_syntax) == 1:
s.append(' Proposed Transfer Syntax:')
else:
s.append(' Proposed Transfer Syntaxes:')
for ts in context.transfer_syntax:
s.append(' ={0!s}'.format(ts.name))
## Extended Negotiation
if pdu.user_information.ext_neg:
s.append('Requested Extended Negotiation:')
for item in pdu.user_information.ext_neg:
s.append(' SOP Class: ={0!s}'.format(item.uid))
#s.append(' Application Information, length: %d bytes'
# %len(item.app_info))
app_info = pretty_bytes(item.app_info)
app_info[0] = '[' + app_info[0][1:]
app_info[-1] = app_info[-1] + ' ]'
for line in app_info:
s.append(' {0!s}'.format(line))
else:
s.append('Requested Extended Negotiation: None')
## Common Extended Negotiation
if pdu.user_information.common_ext_neg:
s.append('Requested Common Extended Negotiation:')
for item in pdu.user_information.common_ext_neg:
s.append(' SOP Class: ={0!s}'.format(item.sop_class_uid.name))
s.append(
" Service Class: ={0!s}"
.format(item.service_class_uid.name)
)
related_uids = item.related_general_sop_class_identification
if related_uids:
s.append(' Related General SOP Class(es):')
for sub_field in related_uids:
s.append(' ={0!s}'.format(sub_field.name))
else:
s.append(' Related General SOP Classes: None')
else:
s.append('Requested Common Extended Negotiation: None')
## Asynchronous Operations Window Negotiation
async_ops = pdu.user_information.async_ops_window
if async_ops is not None:
s.append('Requested Asynchronous Operations Window Negotiation:')
s.append(
" Maximum Invoked Operations: {}"
.format(async_ops.maximum_number_operations_invoked)
)
s.append(
" Maximum Performed Operations: {}"
.format(async_ops.maximum_number_operations_performed)
)
else:
s.append(
"Requested Asynchronous Operations Window Negotiation: None"
)
## User Identity
if user_info.user_identity is not None:
usid = user_info.user_identity
s.append('Requested User Identity Negotiation:')
s.append(' Authentication Mode: {0:d} - '
'{1!s}'.format(usid.id_type, usid.id_type_str))
if usid.id_type == 1:
s.append(' Username: '
'[{0!s}]'.format(usid.primary.decode('utf-8')))
elif usid.id_type == 2:
s.append(' Username: '
'[{0!s}]'.format(usid.primary.decode('utf-8')))
s.append(' Password: '
'[{0!s}]'.format(usid.secondary.decode('utf-8')))
elif usid.id_type == 3:
s.append(' Kerberos Service Ticket (not dumped) length: '
'{0:d}'.format(len(usid.primary)))
elif usid.id_type == 4:
s.append(' SAML Assertion (not dumped) length: '
'{0:d}'.format(len(usid.primary)))
elif usid.id_type == 5:
s.append(' JSON Web Token (not dumped) length: '
'{0:d}'.format(len(usid.primary)))
if usid.response_requested:
s.append(' Positive Response Requested: Yes')
else:
s.append(' Positive Response Requested: No')
else:
s.append('Requested User Identity Negotiation: None')
s.append('======================= END A-ASSOCIATE-RQ =================='
'====')
for line in s:
LOGGER.debug(line)
[docs] @staticmethod
def debug_send_data_tf(p_data_tf):
"""
Placeholder for a function callback. Function will be called
immediately prior to encoding and sending an P-DATA-TF to a peer AE
Parameters
----------
a_release_rq : pdu.P_DATA_TF
The P-DATA-TF PDU instance
"""
pass
[docs] @staticmethod
def debug_send_release_rp(a_release_rp):
"""
Placeholder for a function callback. Function will be called
immediately prior to encoding and sending an A-RELEASE-RP to a peer AE
Parameters
----------
a_release_rp : pdu.A_RELEASE_RP
The A-RELEASE-RP PDU instance
"""
pass
[docs] @staticmethod
def debug_send_release_rq(a_release_rq):
"""
Placeholder for a function callback. Function will be called
immediately prior to encoding and sending an A-RELEASE-RQ to a peer AE
Parameters
----------
a_release_rq : pdu.A_RELEASE_RQ
The A-RELEASE-RQ PDU instance
"""
pass
[docs] @staticmethod
def debug_receive_abort(a_abort):
"""
Placeholder for a function callback. Function will be called
immediately after receiving and decoding an A-ABORT
Parameters
----------
a_abort : pdu.A_ABORT_RQ
The A-ABORT PDU instance
"""
s = ['Abort Parameters:']
s.append('========================== BEGIN A-ABORT ===================='
'====')
s.append('Abort Source: {0!s}'.format(a_abort.source_str))
s.append('Abort Reason: {0!s}'.format(a_abort.reason_str))
s.append('=========================== END A-ABORT ====================='
'====')
for line in s:
LOGGER.debug(line)
[docs] @staticmethod
def debug_receive_associate_ac(a_associate_ac):
"""
Placeholder for a function callback. Function will be called
immediately after receiving and decoding an A-ASSOCIATE-AC
The default implementation is used for logging debugging information
Most of this should be moved to on_association_accepted()
Parameters
----------
a_associate_ac : pdu.A_ASSOCIATE_AC
The A-ASSOCIATE-AC PDU instance
"""
# Shorthand
assoc_ac = a_associate_ac
app_context = assoc_ac.application_context_name.title()
pres_contexts = assoc_ac.presentation_context
user_info = assoc_ac.user_information
async_ops = user_info.async_ops_window
roles = user_info.role_selection
their_class_uid = 'unknown'
their_version = b'unknown'
if user_info.implementation_class_uid:
their_class_uid = user_info.implementation_class_uid
if user_info.implementation_version_name:
their_version = user_info.implementation_version_name
s = ['Accept Parameters:']
s.append('====================== BEGIN A-ASSOCIATE-AC ================'
'=====')
s.append('Their Implementation Class UID: {0!s}'
.format(their_class_uid))
s.append('Their Implementation Version Name: {0!s}'
.format(their_version.decode('ascii')))
s.append('Application Context Name: {0!s}'.format(app_context))
s.append('Calling Application Name: {0!s}'
.format(assoc_ac.calling_ae_title.decode('ascii')))
s.append('Called Application Name: {0!s}'
.format(assoc_ac.called_ae_title.decode('ascii')))
s.append('Their Max PDU Receive Size: {0!s}'
.format(user_info.maximum_length))
s.append('Presentation Contexts:')
for item in pres_contexts:
s.append(' Context ID: {0!s} ({1!s})'
.format(item.context_id, item.result_str))
if item.result == 0:
s.append(' Accepted Transfer Syntax: ={0!s}'
.format(item.transfer_syntax.name))
## Role Selection
if roles:
s.append("Accepted Role Selection:")
for uid in sorted(roles.keys()):
s.append(" SOP Class: ={}".format(uid.name))
str_roles = []
if roles[uid].scp_role:
str_roles.append('SCP')
if roles[uid].scu_role:
str_roles.append('SCU')
str_roles = '/'.join(str_roles)
s.append(" SCP/SCU Role: {}".format(str_roles))
## Extended Negotiation
if user_info.ext_neg:
s.append('Accepted Extended Negotiation:')
for item in user_info.ext_neg:
s.append(' SOP Class: ={0!s}'.format(item.uid))
app_info = pretty_bytes(item.app_info)
app_info[0] = '[' + app_info[0][1:]
app_info[-1] = app_info[-1] + ' ]'
for line in app_info:
s.append(' {0!s}'.format(line))
else:
s.append('Accepted Extended Negotiation: None')
## Asynchronous Operations
if async_ops:
s.append(
"Accepted Asynchronous Operations Window Negotiation:"
)
s.append(
" Maximum Invoked Operations: {}"
.format(async_ops.maximum_number_operations_invoked)
)
s.append(
" Maximum Performed Operations: {}"
.format(async_ops.maximum_number_operations_performed)
)
else:
s.append(
"Accepted Asynchronous Operations Window Negotiation: None"
)
## User Identity
usr_id = 'Yes' if user_info.user_identity is not None else 'None'
s.append('User Identity Negotiation Response: {0!s}'.format(usr_id))
s.append('======================= END A-ASSOCIATE-AC =================='
'====')
for line in s:
LOGGER.debug(line)
LOGGER.info('Association Accepted')
[docs] @staticmethod
def debug_receive_associate_rj(a_associate_rj):
"""
Placeholder for a function callback. Function will be called
immediately after receiving and decoding an A-ASSOCIATE-RJ
Parameters
----------
a_associate_rj : pdu.A_ASSOCIATE_RJ
The A-ASSOCIATE-RJ PDU instance
"""
# Shorthand
assoc_rj = a_associate_rj
s = ['Reject Parameters:']
s.append('====================== BEGIN A-ASSOCIATE-RJ ================='
'=====')
s.append('Result: {0!s}'.format(assoc_rj.result_str))
s.append('Source: {0!s}'.format(assoc_rj.source_str))
s.append('Reason: {0!s}'.format(assoc_rj.reason_str))
s.append('======================= END A-ASSOCIATE-RJ =================='
'====')
for line in s:
LOGGER.debug(line)
[docs] @staticmethod
def debug_receive_associate_rq(a_associate_rq):
"""
Placeholder for a function callback. Function will be called
immediately after receiving and decoding an A-ASSOCIATE-RQ
Parameters
----------
a_associate_rq : pdu.A_ASSOCIATE_RQ
The A-ASSOCIATE-RQ PDU instance
"""
LOGGER.info("Association Received")
# Shorthand
pdu = a_associate_rq
app_context = pdu.application_context_name.title()
pres_contexts = pdu.presentation_context
user_info = pdu.user_information
#responding_ae = 'resp. AP Title'
their_class_uid = 'unknown'
their_version = b'unknown'
if user_info.implementation_class_uid:
their_class_uid = user_info.implementation_class_uid
if user_info.implementation_version_name:
their_version = user_info.implementation_version_name
s = ['Request Parameters:']
s.append('====================== BEGIN A-ASSOCIATE-RQ ================'
'=====')
s.append('Their Implementation Class UID: {0!s}'
.format(their_class_uid))
s.append('Their Implementation Version Name: {0!s}'
.format(their_version.decode('ascii')))
s.append('Application Context Name: {0!s}'
.format(app_context))
s.append('Calling Application Name: {0!s}'
.format(pdu.calling_ae_title.decode('ascii')))
s.append('Called Application Name: {0!s}'
.format(pdu.called_ae_title.decode('ascii')))
s.append('Their Max PDU Receive Size: {0!s}'
.format(user_info.maximum_length))
## Presentation Contexts
if len(pres_contexts) == 1:
s.append('Presentation Context:')
else:
s.append('Presentation Contexts:')
for context in pres_contexts:
s.append(' Context ID: {0!s} '
'(Proposed)'.format((context.context_id)))
s.append(' Abstract Syntax: ='
'{0!s}'.format(context.abstract_syntax.name))
# Add SCP/SCU Role Selection Negotiation
# Roles are: SCU, SCP/SCU, SCP, Default
if pdu.user_information.role_selection:
try:
role = pdu.user_information.role_selection[
context.abstract_syntax
]
roles = []
if role.scp_role:
roles.append('SCP')
if role.scu_role:
roles.append('SCU')
scp_scu_role = '/'.join(roles)
except KeyError:
scp_scu_role = 'Default'
else:
scp_scu_role = 'Default'
s.append(' Proposed SCP/SCU Role: {0!s}'.format(scp_scu_role))
# Transfer Syntaxes
if len(context.transfer_syntax) == 1:
s.append(' Proposed Transfer Syntax:')
else:
s.append(' Proposed Transfer Syntaxes:')
for ts in context.transfer_syntax:
s.append(' ={0!s}'.format(ts.name))
## Extended Negotiation
if pdu.user_information.ext_neg:
s.append('Requested Extended Negotiation:')
for item in pdu.user_information.ext_neg:
s.append(' SOP Class: ={0!s}'.format(item.uid))
#s.append(' Application Information, length: %d bytes'
# %len(item.app_info))
app_info = pretty_bytes(item.app_info)
app_info[0] = '[' + app_info[0][1:]
app_info[-1] = app_info[-1] + ' ]'
for line in app_info:
s.append(' {0!s}'.format(line))
else:
s.append('Requested Extended Negotiation: None')
## Common Extended Negotiation
if pdu.user_information.common_ext_neg:
s.append('Requested Common Extended Negotiation:')
for item in pdu.user_information.common_ext_neg:
s.append(' SOP Class: ={0!s}'.format(item.sop_class_uid.name))
s.append(
" Service Class: ={0!s}"
.format(item.service_class_uid.name)
)
related_uids = item.related_general_sop_class_identification
if related_uids:
s.append(' Related General SOP Class(es):')
for sub_field in related_uids:
s.append(' ={0!s}'.format(sub_field.name))
else:
s.append(' Related General SOP Classes: None')
else:
s.append('Requested Common Extended Negotiation: None')
## Asynchronous Operations Window Negotiation
async_ops = pdu.user_information.async_ops_window
if async_ops is not None:
s.append('Requested Asynchronous Operations Window Negotiation:')
s.append(
" Maximum Invoked Operations: {}"
.format(async_ops.maximum_number_operations_invoked)
)
s.append(
" Maximum Performed Operations: {}"
.format(async_ops.maximum_number_operations_performed)
)
else:
s.append(
"Requested Asynchronous Operations Window Negotiation: None"
)
## User Identity
if user_info.user_identity is not None:
usid = user_info.user_identity
s.append('Requested User Identity Negotiation:')
s.append(' Authentication Mode: {0:d} - {1!s}'
.format(usid.id_type, usid.id_type_str))
if usid.id_type == 1:
s.append(' Username: [{0!s}]'
.format(usid.primary.decode('utf-8')))
elif usid.id_type == 2:
s.append(' Username: [{0!s}]'
.format(usid.primary.decode('utf-8')))
s.append(' Password: [{0!s}]'
.format(usid.secondary.decode('utf-8')))
elif usid.id_type == 3:
s.append(' Kerberos Service Ticket (not dumped) length: '
'{0:d}'.format(len(usid.primary)))
elif usid.id_type == 4:
s.append(' SAML Assertion (not dumped) length: '
'{0:d}'.format(len(usid.primary)))
elif usid.id_type == 5:
s.append(' JSON Web Token (not dumped) length: '
'{0:d}'.format(len(usid.primary)))
if usid.response_requested:
s.append(' Positive Response requested: Yes')
else:
s.append(' Positive Response requested: None')
else:
s.append('Requested User Identity Negotiation: None')
s.append('======================= END A-ASSOCIATE-RQ =================='
'====')
for line in s:
LOGGER.debug(line)
[docs] @staticmethod
def debug_receive_data_tf(p_data_tf):
"""
Placeholder for a function callback. Function will be called
immediately after receiving and decoding an P-DATA-TF
Parameters
----------
p_data_tf : pdu.P_DATA_TF
The P-DATA-TF PDU instance
"""
pass
[docs] @staticmethod
def debug_receive_release_rp(a_release_rp):
"""
Placeholder for a function callback. Function will be called
immediately after receiving and decoding an A-RELEASE-RP
Parameters
----------
a_release_rp : pdu.A_RELEASE_RP
The A-RELEASE-RP PDU instance
"""
pass
[docs] @staticmethod
def debug_receive_release_rq(a_release_rq):
"""
Placeholder for a function callback. Function will be called
immediately after receiving and decoding an A-RELEASE-RQ
Parameters
----------
a_release_rq : pdu.A_RELEASE_RQ
The A-RELEASE-RQ PDU instance
"""
pass