Source code for filter_functions.util

# -*- coding: utf-8 -*-
# =============================================================================
#     filter_functions
#     Copyright (C) 2019 Quantum Technology Group, RWTH Aachen University
#
#     This program is free software: you can redistribute it and/or modify
#     it under the terms of the GNU General Public License as published by
#     the Free Software Foundation, either version 3 of the License, or
#     (at your option) any later version.
#
#     This program is distributed in the hope that it will be useful,
#     but WITHOUT ANY WARRANTY; without even the implied warranty of
#     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#     GNU General Public License for more details.
#
#     You should have received a copy of the GNU General Public License
#     along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#     Contact email: tobias.hangleiter@rwth-aachen.de
# =============================================================================
"""
This module provides various helper functions.

Functions
---------
:func:`abs2`
    Absolute value squared
:func:`get_indices_from_identifiers`
    The the indices of a subset of identifiers within a list of
    identifiers.
:func:`tensor`
    Fast, flexible tensor product of an arbitrary number of inputs using
    :func:`~numpy.einsum`
:func:`tensor_insert`
    For an array that is known to be a tensor product, insert arrays at
    a given position in the product chain
:func:`tensor_merge`
    For two arrays that are tensor products of known dimensions, merge
    them at arbitary positions in the product chain
:func:`tensor_transpose`
    For a tensor product, transpose the order of the constituents in the
    product chain
:func:`mdot`
    Multiple matrix product
:func:`remove_float_errors`
    Set entries whose absolute value is below a certain threshold to
    zero
:func:`oper_equiv`
    Determine if two vectors or operators are equal up to a global phase
:func:`dot_HS`
    Hilbert-Schmidt inner product
:func:`get_sample_frequencies`
    Get frequencies with typical infrared and ultraviolet cutoffs for a
    ``PulseSequence``
:func:`progressbar`
    A progress bar for loops. Uses tqdm if available and a simple custom
    one if not.
:func:`hash_array_along_axis`
    Return a list of hashes along a given axis
:func:`all_array_equal`
    Check if all arrays in an iterable are equal

Exceptions
----------
:class:`CalculationError`
    Exception raised if trying to fetch the pulse correlation function
    when it was not computed during concatenation

"""
import functools
import inspect
import operator
import string
from itertools import zip_longest
from typing import Callable, Iterable, List, Optional, Sequence, Tuple, Union

import numpy as np
from numpy import ndarray

from .types import Operator, State

try:
    import ipynbname
    _NOTEBOOK_NAME = ipynbname.name()
except (ImportError, IndexError, FileNotFoundError):
    _NOTEBOOK_NAME = ''

if _NOTEBOOK_NAME:
    from tqdm.notebook import tqdm as _tqdm
else:
    # Either not running notebook or not able to determine
    from tqdm import tqdm as _tqdm

__all__ = ['paulis', 'abs2', 'all_array_equal', 'dot_HS', 'get_sample_frequencies',
           'hash_array_along_axis', 'mdot', 'oper_equiv', 'progressbar', 'remove_float_errors',
           'tensor', 'tensor_insert', 'tensor_merge', 'tensor_transpose']

# Pauli matrices
paulis = np.array([
    [[1, 0],
     [0, 1]],
    [[0, 1],
     [1, 0]],
    [[0, -1j],
     [1j, 0]],
    [[1, 0],
     [0, -1]],
])


[docs]def abs2(x: ndarray) -> ndarray: r""" Fast function to calculate the absolute value squared, .. math:: |\cdot|^2 := \Re(\cdot)^2 + \Im(\cdot)^2 Equivalent to:: np.abs(x)**2 """ return x.real**2 + x.imag**2
def cexp(x: ndarray, out=None, where=True) -> ndarray: r"""Fast complex exponential. Parameters ---------- x: ndarray Argument of the complex exponential :math:`\exp(i x)`. out: ndarray, None, or tuple of ndarray and None, optional A location into which the result is stored. See :func:`numpy.ufunc`. where: array_like, optional This condition is broadcast over the input. See :func:`numpy.ufunc`. Returns ------- y: ndarray Complex exponential :math:`y = \exp(i x)`. References ---------- https://software.intel.com/en-us/forums/intel-distribution-for-python/topic/758148 """ out = np.empty(x.shape, dtype=np.complex128) if out is None else out out.real = np.cos(x, out=out.real, where=where) out.imag = np.sin(x, out=out.imag, where=where) return out def parse_optional_parameters(**allowed_kwargs: Sequence) -> Callable: """Decorator factory to parse optional parameter with certain legal values. For ``allowed_kwargs = {name: allowed, ...}``: If the parameter value corresponding to ``name`` (either in args or kwargs of the decorated function) is not contained in ``allowed`` a ``ValueError`` is raised. """ def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): parameters = inspect.signature(func).parameters for name, allowed in allowed_kwargs.items(): idx = tuple(parameters).index(name) try: value = args[idx] except IndexError: value = kwargs.get(name, parameters[name].default) if value not in allowed: raise ValueError(f"Invalid value for {name}: {value}. " + f"Should be one of {allowed}.") return func(*args, **kwargs) return wrapper return decorator def parse_spectrum(spectrum: Sequence, omega: Sequence, idx: Sequence) -> ndarray: error = 'Spectrum should be of shape {}, not {}.' shape = (len(idx),)*(spectrum.ndim - 1) + (len(omega),) try: spectrum = np.broadcast_to(spectrum, shape) except ValueError as broadcast_error: raise ValueError(error.format(shape, spectrum.shape)) from broadcast_error if spectrum.ndim == 3 and not np.allclose(spectrum, spectrum.conj().swapaxes(0, 1)): raise ValueError('Cross-spectra given but not Hermitian along first two axes') elif spectrum.ndim > 3: raise ValueError(f'Expected spectrum to have < 4 dimensions, not {spectrum.ndim}') return spectrum def parse_operators(opers: Sequence[Operator], err_loc: str) -> List[ndarray]: """Parse a sequence of operators and convert to ndarray. Parameters ---------- opers: Sequence[Operator] Sequence of operators. err_loc: str Some cosmetics for the exceptions to be raised. Raises ------ TypeError If any operator is not a valid type. ValueError If not all operators are 2d and square. Returns ------- parse_opers: ndarray, shape (len(opers), *opers[0].shape) The parsed ndarray. """ parsed_opers = [] for oper in opers: if isinstance(oper, ndarray): parsed_opers.append(oper.squeeze()) elif hasattr(oper, 'full'): # qutip.Qobj parsed_opers.append(oper.full()) elif hasattr(oper, 'todense'): # sparse object parsed_opers.append(oper.todense()) elif hasattr(oper, 'data') and hasattr(oper, 'dexp'): # qopt DenseMatrix parsed_opers.append(oper.data) else: raise TypeError(f'Expected operators in {err_loc} to be NumPy arrays or QuTiP Qobjs!') parsed_opers = np.asarray(parsed_opers, dtype=complex) # Check correct dimensions of the operators if parsed_opers.ndim > 3: raise ValueError(f'Expected operators in {err_loc} to be two-dimensional!') if len(set(parsed_opers.shape[-2:])) != 1: raise ValueError(f'Expected operators in {err_loc} to be square!') return parsed_opers def _tensor_product_shape(shape_A: Sequence[int], shape_B: Sequence[int], rank: int): """Get shape of the tensor product between A and B of rank rank""" broadcast_shape = () # Loop over dimensions from last to first, filling the 'shorter' shape # with 1's once it is exhausted for dims in zip_longest(shape_A[-rank-1::-1], shape_B[-rank-1::-1], fillvalue=1): if 1 in dims: # Broadcast 1-d of argument to dimension of other broadcast_shape = (max(dims),) + broadcast_shape elif len(set(dims)) == 1: # Both arguments have same dimension on axis. broadcast_shape = dims[:1] + broadcast_shape else: raise ValueError(f'Incompatible shapes {shape_A} and {shape_B} ' + f'for tensor product of rank {rank}.') # Shape of the actual tensor product is product of each dimension, # again broadcasting if need be tensor_shape = tuple( functools.reduce(operator.mul, dimensions) for dimensions in zip_longest(shape_A[:-rank-1:-1], shape_B[:-rank-1:-1], fillvalue=1) )[::-1] return broadcast_shape + tensor_shape def _parse_dims_arg(name: str, dims: Sequence[Sequence[int]], rank: int) -> None: """Check if dimension arg for a tensor_* function is correct format""" if not len(dims) == rank: raise ValueError(f'{name}_dims should be of length rank = {rank}, not {len(dims)}') if not len(set(len(dim) for dim in dims)) == 1: # Not all nested lists the same length as required raise ValueError(f'Require all lists in {name}_dims to be of same length!') def get_indices_from_identifiers(all_identifiers: Sequence[str], identifiers: Union[None, str, Sequence[str]]) -> Sequence[int]: """Get the indices of operators for given identifiers. Parameters ---------- all_identifiers: sequence of str All available identifiers. identifiers: str or sequence of str The identifiers whose indices to get. """ identifier_to_index_table = {identifier: index for index, identifier in enumerate(all_identifiers)} if identifiers is None: inds = np.arange(len(all_identifiers)) else: try: if isinstance(identifiers, str): inds = np.array([identifier_to_index_table[identifiers]]) else: inds = np.array([identifier_to_index_table[identifier] for identifier in identifiers]) except KeyError: raise ValueError('Invalid identifiers given. All available ones ' + f'are: {all_identifiers}') return inds
[docs]def tensor(*args, rank: int = 2, optimize: Union[bool, str] = False) -> ndarray: """ Fast, flexible tensor product using einsum. The product is taken over the last *rank* axes and broadcast over the remaining axes which thus need to follow numpy broadcasting rules. Note that vectors are treated as rank 2 tensors with shape (1, x) or (x, 1). For example, the following shapes are compatible: - ``rank == 2`` (e.g. matrices or vectors):: (a, b, c, d, d), (a, b, c, e, e) -> (a, b, c, d*e, d*e) (a, b, c), (a, d, e) -> (a, b*d, c*e) (a, b), (c, d, e) -> (c, a*d, b*e) (1, a), (b, 1, c) -> (b, 1, a*c) - ``rank == 1``:: (a, b), (a, c) -> (a, b*c) (a, b, 1), (a, c) -> (a, b, c) Parameters ---------- args: array_like The elements of the tensor product rank: int, optional (default: 2) The rank of the tensors. E.g., for a Kronecker product between two matrices ``rank == 2``. The remaining axes are broadcast over. optimize: bool|str, optional (default: False) Optimize the tensor contraction order. Passed through to :func:`numpy.einsum`. Examples -------- >>> Z = np.diag([1, -1]) >>> np.array_equal(tensor(Z, Z), np.kron(Z, Z)) True >>> A, B = np.arange(2), np.arange(2, 5) >>> tensor(A, B, rank=1) array([[0, 0, 0, 2, 3, 4]]) >>> args = np.random.randn(4, 10, 3, 2) >>> result = tensor(*args, rank=1) >>> result.shape == (10, 3, 2**4) True >>> result = tensor(*args, rank=2) >>> result.shape == (10, 3**4, 2**4) True >>> A, B = np.random.randn(1, 3), np.random.randn(3, 4) >>> result = tensor(A, B) >>> result.shape == (1*3, 3*4) True >>> A, B = np.random.randn(3, 1, 2), np.random.randn(2, 2, 2) >>> try: ... result = tensor(A, B, rank=2) ... except ValueError as err: # cannot broadcast over axis 0 ... print(err) Incompatible shapes (3, 1, 2) and (2, 2, 2) for tensor product of rank 2. >>> result = tensor(A, B, rank=3) >>> result.shape == (3*2, 1*2, 2*2) True See Also -------- numpy.kron: NumPy tensor product. tensor_insert: Insert array at given position in tensor product chain. tensor_merge: Merge tensor product chains. tensor_transpose: Transpose the order of a tensor product chain. """ chars = string.ascii_letters # All the subscripts we need A_chars = chars[:rank] B_chars = chars[rank:2*rank] subscripts = '...{},...{}->...{}'.format( A_chars, B_chars, ''.join(i + j for i, j in zip(A_chars, B_chars)) ) def binary_tensor(A, B): """Compute the Kronecker product of two tensors""" # Add dimensions so that each arg has at least ndim == rank while A.ndim < rank: A = A[None, :] while B.ndim < rank: B = B[None, :] outshape = _tensor_product_shape(A.shape, B.shape, rank) return np.einsum(subscripts, A, B, optimize=optimize).reshape(outshape) # Compute the tensor products in a binary tree-like structure, calculating # the product of two leaves and working up. This is more memory-efficient # than reduce(binary_tensor, args) which computes the products # left-to-right. n = len(args) bit = n % 2 while n > 1: args = args[:bit] + tuple(binary_tensor(*args[i:i+2]) for i in range(bit, n, 2)) n = len(args) bit = n % 2 return args[0]
[docs]def tensor_insert(arr: ndarray, *args, pos: Union[int, Sequence[int]], arr_dims: Sequence[Sequence[int]], rank: int = 2, optimize: Union[bool, str] = False) -> ndarray: r""" For a tensor product *arr*, insert *args* into the product chain at *pos*. E.g, if :math:`\verb|arr|\equiv A\otimes B\otimes C` and :math:`\verb|pos|\equiv 2`, the result will be the tensor product .. math:: A\otimes B\otimes\left[\bigotimes_{X\in\verb|args|}X\right] \otimes C. This function works in a similar way to :func:`numpy.insert` and the following would be functionally equivalent in the case that the constituent tensors of the product *arr* are known: >>> tensor_insert(tensor(*arrs, rank=rank), *args, pos=pos, arr_dims=..., ... rank=rank) >>> tensor(*np.insert(arrs, pos, args, axis=0), rank=rank) Parameters ---------- arr: ndarray The tensor product in whose chain the other args should be inserted *args: ndarray The tensors to be inserted in the product chain pos: int|sequence of ints The position(s) at which the args are inserted in the product chain. If an int and ``len(args) > 1``, it is repeated so that all args are inserted in a row. If a sequence, it should indicate the indices in the original tensor product chain that led to *arr* before which *args* should be inserted. arr_dims: array_like, shape (rank, n_const) The last *rank* dimensions of the *n_const* constituent tensors of the tensor product *arr* as a list of lists with the list at position *i* containing the *i*-th relevant dimension of all args. Since the remaing axes are broadcast over, their shape is irrelevant. For example, if ``arr = tensor(a, b, c, rank=2)`` and ``a,b,c`` have shapes ``(2, 3, 4), (5, 2, 2, 1), (2, 2)``, ``arr_dims = [[3, 2, 2], [4, 1, 2]]``. rank: int, optional (default: 2) The rank of the tensors. E.g., for a Kronecker product between two vectors, ``rank == 1``, and between two matrices ``rank == 2``. The remaining axes are broadcast over. optimize: bool|str, optional (default: False) Optimize the tensor contraction order. Passed through to :func:`numpy.einsum`. Examples -------- >>> I, X, Y, Z = paulis >>> arr = tensor(X, I) >>> r = tensor_insert(arr, Y, Z, arr_dims=[[2, 2], [2, 2]], pos=0) >>> np.allclose(r, tensor(Y, Z, X, I)) True >>> r = tensor_insert(arr, Y, Z, arr_dims=[[2, 2], [2, 2]], pos=1) >>> np.allclose(r, tensor(X, Y, Z, I)) True >>> r = tensor_insert(arr, Y, Z, arr_dims=[[2, 2], [2, 2]], pos=2) >>> np.allclose(r, tensor(X, I, Y, Z)) True Other ranks and different dimensions: >>> from numpy.random import randn >>> A, B, C = randn(2, 3, 1, 2), randn(2, 2, 2, 2), randn(3, 2, 1) >>> arr = tensor(A, C, rank=3) >>> r = tensor_insert(arr, B, pos=1, rank=3, ... arr_dims=[[3, 3], [1, 2], [2, 1]]) >>> np.allclose(r, tensor(A, B, C, rank=3)) True >>> arrs, args = randn(2, 2, 2), randn(2, 2, 2) >>> arr_dims = [[2, 2], [2, 2]] >>> r = tensor_insert(tensor(*arrs), *args, pos=(0, 1), arr_dims=arr_dims) >>> np.allclose(r, tensor(args[0], arrs[0], args[1], arrs[1])) True >>> r = tensor_insert(tensor(*arrs), *args, pos=(0, 0), arr_dims=arr_dims) >>> np.allclose(r, tensor(*args, *arrs)) True >>> r = tensor_insert(tensor(*arrs), *args, pos=(1, 2), arr_dims=arr_dims) >>> np.allclose(r, tensor(*np.insert(arrs, (1, 2), args, axis=0))) True See Also -------- numpy.insert: NumPy array insertion with similar syntax. numpy.kron: NumPy tensor product. tensor_insert: Insert array at given position in tensor product chain. tensor_merge: Merge tensor product chains. tensor_transpose: Transpose the order of a tensor product chain. """ if len(args) == 0: raise ValueError('Require nonzero number of args!') if np.issubdtype(type(pos), np.integer): # super awkward type check, thanks numpy! pos = (pos,) if len(args) > 1: # Inserting all args at same position, perform their tensor product # using tensor and insert the result instead of iteratively insert # one by one args = (tensor(*args, rank=rank, optimize=optimize),) else: if not len(pos) == len(args): raise ValueError('Expected pos to be either an int or a sequence of the same length ' + f'as the number of args, not length {len(pos)}') _parse_dims_arg('arr', arr_dims, rank) def _tensor_insert_subscripts(ndim, pos, rank): """Get einsum string for the contraction""" ins_chars = string.ascii_letters[:rank] arr_chars = string.ascii_letters[rank:(ndim+1)*rank] subscripts = '...{},...{}->...{}'.format( ins_chars, arr_chars, arr_chars[:pos] + ''.join( ins_chars[i] + arr_chars[pos+i*ndim:pos+(i+1)*ndim] for i in range(rank) ) ) return subscripts def single_tensor_insert(arr, ins, arr_dims, pos): """Insert a single tensor *ins* into *arr* at position *pos*.""" subscripts = _tensor_insert_subscripts(len(arr_dims[0]), pos, rank) outshape = _tensor_product_shape(ins.shape, arr.shape, rank) # Need to reshape arr to the rank*ndim-dimensional shape that's the # output of the regular tensor einsum call flat_arr_dims = [dim for axis in arr_dims for dim in axis] reshaped_arr = arr.reshape(*arr.shape[:-rank], *flat_arr_dims) result = np.einsum(subscripts, ins, reshaped_arr, optimize=optimize).reshape(outshape) return result # Insert args one after the other, starting at lowest index result = arr.copy() # Make a deep copy of arr_dims and pos as we modify them carr_dims = [list(axis[:]) for axis in arr_dims] cpos = list(pos).copy() # Number of constituent tensors of the tensor product arr ndim = len(arr_dims[0]) divs, pos = zip(*[divmod(p, ndim) if p != ndim else (0, p) for p in pos]) for i, (p, div, arg, arg_counter) in enumerate(sorted( zip_longest(pos, divs, args, range(len(args)), fillvalue=pos[0]), key=operator.itemgetter(0))): if div not in (-1, 0): raise IndexError(f'Invalid position {cpos[i]} specified. Must ' + f'be between -{ndim} and {ndim}.') # Insert argument arg at position p+i (since every iteration the index # shifts by 1) try: result = single_tensor_insert(result, arg, carr_dims, p+i) except ValueError as err: raise ValueError(f'Could not insert arg {arg_counter} with shape {result.shape} ' + f'into the array with shape {arg.shape} at position {p}.') from err # Update arr_dims for axis, d in zip(carr_dims, arg.shape[-rank:]): axis.insert(p, d) return result
[docs]def tensor_merge(arr: ndarray, ins: ndarray, pos: Sequence[int], arr_dims: Sequence[Sequence[int]], ins_dims: Sequence[Sequence[int]], rank: int = 2, optimize: Union[bool, str] = False) -> ndarray: r""" For two tensor products *arr* and *ins*, merge *ins* into the product chain at indices *pos*. E.g, if :math:`\verb|arr|\equiv A\otimes B\otimes C`, :math:`\verb|ins|\equiv D\otimes E`, and :math:`\verb|pos|\equiv [1, 2]`, the result will be the tensor product .. math:: A\otimes D\otimes B\otimes E\otimes C. This function works in a similar way to :func:`numpy.insert` and :func:`tensor_insert`. Parameters ---------- arr: ndarray The tensor product in whose chain the other args should be inserted ins: ndarray The tensor product to be inserted in the product chain pos: sequence of ints The positions at which the constituent tensors of *ins* are inserted in the product chain. Should indicate the indices in the original tensor product chain that led to *arr* before which the constituents of *ins* should be inserted. arr_dims: array_like, shape (rank, n_const) The last *rank* dimensions of the *n_const* constituent tensors of the tensor product *arr* as a list of lists with the list at position *i* containing the *i*-th relevant dimension of all args. Since the remaing axes are broadcast over, their shape is irrelevant. For example, if ``arr = tensor(a, b, c, rank=2)`` and ``a,b,c`` have shapes ``(2, 3, 4), (5, 2, 2, 1), (2, 2)``, ``arr_dims = [[3, 2, 2], [4, 1, 2]]``. ins_dims: array_like, shape (rank, n_const) The last *rank* dimensions of the *n_const* constituent tensors of the tensor product *ins* as a list of lists with the list at position *i* containing the *i*-th relevant dimension of *ins*. Since the remaing axes are broadcast over, their shape is irrelevant. rank: int, optional (default: 2) The rank of the tensors. E.g., for a Kronecker product between two vectors, ``rank == 1``, and between two matrices ``rank == 2``. The remaining axes are broadcast over. optimize: bool|str, optional (default: False) Optimize the tensor contraction order. Passed through to :func:`numpy.einsum`. Examples -------- >>> I, X, Y, Z = paulis >>> arr = tensor(X, Y, Z) >>> ins = tensor(I, I) >>> r1 = tensor_merge(arr, ins, pos=[1, 2], arr_dims=[[2]*3, [2]*3], ... ins_dims=[[2]*2, [2]*2]) >>> np.allclose(r1, tensor(X, I, Y, I, Z)) True >>> r2 = tensor_merge(ins, arr, pos=[0, 1, 2], arr_dims=[[2]*2, [2]*2], ... ins_dims=[[2]*3, [2]*3]) >>> np.allclose(r1, r2) True :func:`tensor_insert` can provide the same functionality in some cases: >>> arr = tensor(Y, Z) >>> ins = tensor(I, X) >>> r1 = tensor_merge(arr, ins, pos=[0, 0], arr_dims=[[2]*2, [2]*2], ... ins_dims=[[2]*2, [2]*2]) >>> r2 = tensor_insert(arr, I, X, pos=[0, 0], arr_dims=[[2]*2, [2]*2]) >>> np.allclose(r1, r2) True Also tensors of rank other than 2 and numpy broadcasting are supported: >>> arr = np.random.randn(2, 10, 3, 4) >>> ins = np.random.randn(2, 10, 3, 2) >>> r = tensor_merge(tensor(*arr, rank=1), tensor(*ins, rank=1), [0, 1], ... arr_dims=[[4, 4]], ins_dims=[[2, 2]], rank=1) >>> np.allclose(r, tensor(ins[0], arr[0], ins[1], arr[1], rank=1)) True See Also -------- numpy.insert: NumPy array insertion with similar syntax. numpy.kron: NumPy tensor product. tensor: Fast tensor product with broadcasting. tensor_insert: Insert array at given position in tensor product chain. tensor_transpose: Transpose the order of a tensor product chain. """ # Parse dimension args for arg_name, arg_dims in zip(('arr', 'ins'), (arr_dims, ins_dims)): _parse_dims_arg(arg_name, arg_dims, rank) ins_ndim = len(ins_dims[0]) arr_ndim = len(arr_dims[0]) ins_chars = string.ascii_letters[:ins_ndim*rank] arr_chars = string.ascii_letters[ins_ndim*rank:(ins_ndim+arr_ndim)*rank] out_chars = '' for r in range(rank): arr_part = arr_chars[r*arr_ndim:(r+1)*arr_ndim] ins_part = ins_chars[r*ins_ndim:(r+1)*ins_ndim] for i, (p, ins_p) in enumerate(sorted(zip(pos, ins_part))): if p != arr_ndim: div, p = divmod(p, arr_ndim) if div not in (-1, 0): raise IndexError(f'Invalid position {pos[i]} specified. Must be between ' + f'-{arr_ndim} and {arr_ndim}.') arr_part = arr_part[:p+i] + ins_p + arr_part[p+i:] out_chars += arr_part subscripts = f'...{ins_chars},...{arr_chars}->...{out_chars}' outshape = _tensor_product_shape(ins.shape, arr.shape, rank) # Need to reshape arr to the rank*ndim-dimensional shape that's the # output of the regular tensor einsum call flat_arr_dims = [dim for axis in arr_dims for dim in axis] flat_ins_dims = [dim for axis in ins_dims for dim in axis] # Catch exceptions from wrong ins/arr_dims arguments try: ins_reshaped = ins.reshape(*ins.shape[:-rank], *flat_ins_dims) except ValueError as err: raise ValueError('ins_dims not compatible with ins.shape[-rank:] = ' + f'{ins.shape[-rank:]}') from err try: arr_reshaped = arr.reshape(*arr.shape[:-rank], *flat_arr_dims) except ValueError as err: raise ValueError('arr_dims not compatible with arr.shape[-rank:] = ' + f'{arr.shape[-rank:]}') from err result = np.einsum(subscripts, ins_reshaped, arr_reshaped, optimize=optimize).reshape(outshape) return result
[docs]def tensor_transpose(arr: ndarray, order: Sequence[int], arr_dims: Sequence[Sequence[int]], rank: int = 2) -> ndarray: r""" Transpose the order of a tensor product chain. Parameters ---------- arr: ndarray The tensor product whose chain should be reordered. order: sequence of ints The transposition order. If ``arr == tensor(A, B)`` and ``order == (1, 0)``, the result will be ``tensor(B, A)``. arr_dims: array_like, shape (rank, n_const) The last *rank* dimensions of the *n_const* constituent tensors of the tensor product *arr* as a list of lists with the list at position *i* containing the *i*-th relevant dimension of all args. Since the remaing axes are broadcast over, their shape is irrelevant. For example, if ``arr = tensor(a, b, c, rank=2)`` and ``a,b,c`` have shapes ``(2, 3, 4), (5, 2, 2, 1), (2, 2)``, ``arr_dims = [[3, 2, 2], [4, 1, 2]]``. rank: int, optional (default: 2) The rank of the tensors. E.g., for a Kronecker product between two vectors, ``rank == 1``, and between two matrices ``rank == 2``. The remaining axes are broadcast over. Returns ------- transposed_arr: ndarray The tensor product *arr* with its order transposed according to *order* Examples -------- >>> I, X, Y, Z = paulis >>> arr = tensor(X, Y, Z) >>> transposed = tensor_transpose(arr, [1, 2, 0], arr_dims=[[2, 2, 2]]*2) >>> np.allclose(transposed, tensor(Y, Z, X)) True See Also -------- numpy.insert: NumPy array insertion with similar syntax. numpy.kron: NumPy tensor product. tensor: Fast tensor product with broadcasting. tensor_insert: Insert array at given position in tensor product chain. tensor_merge: Merge tensor product chains. """ _parse_dims_arg('arr', arr_dims, rank) ndim = len(arr_dims[0]) # Number of axes that are broadcast over n_broadcast = len(arr.shape[:-rank]) transpose_axes = ([i for i in range(n_broadcast)] + [n_broadcast + r*ndim + o for r in range(rank) for o in order]) # Need to reshape arr to the rank*ndim-dimensional shape that's the # output of the regular tensor einsum call flat_arr_dims = [dim for axis in arr_dims for dim in axis] # Catch exceptions from wrong arr_dims argument try: arr_reshaped = arr.reshape(*arr.shape[:-rank], *flat_arr_dims) except ValueError as err: raise ValueError('arr_dims not compatible with arr.shape[-rank:] = ' + f'{arr.shape[-rank:]}') from err try: result = arr_reshaped.transpose(*transpose_axes).reshape(arr.shape) except TypeError as type_err: raise TypeError("Could not transpose the order. Are all elements of " + "'order' integers?") from type_err except ValueError as val_err: raise ValueError("Could not transpose the order. Are all elements " + "of 'order' unique and match the array?") from val_err return result
[docs]def mdot(arr: Sequence, axis: int = 0) -> ndarray: """Multiple matrix products along axis""" return functools.reduce(np.matmul, np.swapaxes(arr, 0, axis))
def integrate(f: ndarray, x: Optional[ndarray] = None, dx: float = 1.0) -> Union[ndarray, float, complex]: """Fast trapezoidal integration with small memory footprint. Parameters ---------- f: ndarray Function to be integrated. x: ndarray, optional Optional integration domain if the values are not evenly spaced. dx: float, optional Spacing. The default is 1.0. Returns ------- result: ndarray Integral over the last axis of *f*. See Also -------- numpy.trapz """ dx = np.diff(x) if x is not None else dx ret = f[..., 1:] + f[..., :-1] ret *= dx return ret.sum(axis=-1)/2
[docs]def remove_float_errors(arr: ndarray, eps_scale: Optional[float] = None) -> ndarray: """ Clean up arr by removing floating point numbers smaller than the dtype's precision multiplied by eps_scale. Treats real and imaginary parts separately. Obviously only works for arrays with norm ~1. """ arr = np.asanyarray(arr) if eps_scale is None: atol = np.finfo(arr.dtype).eps if arr.ndim: atol *= arr.shape[-1] else: atol = np.finfo(arr.dtype).eps*eps_scale if arr.ndim: if arr.dtype == float: arr[np.abs(arr) <= atol] = 0 else: arr.real[np.abs(arr.real) <= atol] = 0 arr.imag[np.abs(arr.imag) <= atol] = 0 else: if arr.dtype == float: arr.real = 0 if np.abs(arr) <= atol else arr else: arr.real = 0 if np.abs(arr.real) <= atol else arr.real arr.imag = 0 if np.abs(arr.imag) <= atol else arr.imag return arr
[docs]def oper_equiv(psi: Union[Operator, State], phi: Union[Operator, State], eps: Optional[float] = None, normalized: bool = False) -> Tuple[bool, float]: r""" Checks whether psi and phi are equal up to a global phase, i.e. .. math:: |\psi\rangle = e^{i\chi}|\phi\rangle \Leftrightarrow \langle \phi|\psi\rangle = e^{i\chi}, and returns the phase. If the first return value is false, the second is meaningless in this context. psi and phi can also be operators. Parameters ---------- psi, phi: qutip.Qobj or array_like Vectors or operators to be compared eps: float The tolerance below which the two objects are treated as equal, i.e., the function returns ``True`` if ``abs(1 - modulus) <= eps``. normalized: bool Flag indicating if *psi* and *phi* are normalized with respect to the Hilbert-Schmidt inner product :func:`dot_HS`. Examples -------- >>> psi = paulis[1] >>> phi = paulis[1]*np.exp(1j*1.2345) >>> oper_equiv(psi, phi) (True, 1.2345) """ # Convert qutip.Qobj's to numpy arrays psi, phi = [obj.full() if hasattr(obj, 'full') else obj for obj in (psi, phi)] psi, phi = np.atleast_2d(psi, phi) if eps is None: # Tolerance the floating point eps times the # of flops for the matrix # multiplication, i.e. for psi and phi n x m matrices 2*n**2*m eps = (max(np.finfo(psi.dtype).eps, np.finfo(phi.dtype).eps) * np.prod(psi.shape)*phi.shape[-1]*2) if not normalized: # normalization introduces more floating point error eps *= (np.prod(psi.shape[-2:])*phi.shape[-1]*2)**2 try: # Don't need to round at this point inner_product = dot_HS(psi, phi, eps=0) except ValueError as err: raise ValueError('psi and phi have incompatible dimensions!') from err if normalized: norm = 1 else: norm = np.sqrt(dot_HS(psi, psi, eps=0)*dot_HS(phi, phi, eps=0)) phase = np.angle(inner_product) modulus = abs(inner_product) return abs(norm - modulus) <= eps, phase
[docs]def dot_HS(U: Operator, V: Operator, eps: Optional[float] = None) -> Union[float, complex, ndarray]: r"""Return the Hilbert-Schmidt inner product of U and V, .. math:: \langle U, V\rangle_\mathrm{HS} := \mathrm{tr}(U^\dagger V). Parameters ---------- U, V: qutip.Qobj or ndarray Objects to compute the inner product of. eps: float The floating point precision. The result is rounded to `abs(int(np.log10(eps)))` decimals if `eps > 0`. Returns ------- result: float, complex The result rounded to precision eps. Examples -------- >>> U, V = paulis[1:3] >>> dot_HS(U, V) 0.0 >>> dot_HS(U, U) 2.0 """ # Convert qutip.Qobj's to numpy arrays if hasattr(U, 'full'): U = U.full() if hasattr(V, 'full'): V = V.full() if eps is None: # Tolerance is the dtype precision times the number of flops for the # matrix multiplication times two to be on the safe side try: eps = np.finfo(U.dtype).eps*np.prod(U.shape)*V.shape[-1]*2 except ValueError: # dtype is int and therefore exact eps = 0 if eps == 0: res = np.einsum('...ij,...ij', U.conj(), V) else: res = np.around(np.einsum('...ij,...ij', U.conj(), V), decimals=abs(int(np.log10(eps)))) return res if res.imag.any() else res.real
[docs]@parse_optional_parameters(spacing=('log', 'linear')) def get_sample_frequencies(pulse: 'PulseSequence', n_samples: int = 300, spacing: str = 'log', include_quasistatic: bool = False, omega_min: Optional[float] = None, omega_max: Optional[float] = None) -> ndarray: r"""Get *n_samples* sample frequencies spaced 'linear' or 'log'. The ultraviolet cutoff is taken to be one order of magnitude larger than the timescale of the pulse tau. In the case of log spacing, the values are clipped in the infrared at two orders of magnitude below the timescale of the pulse. Parameters ---------- pulse: PulseSequence The pulse to get frequencies for. n_samples: int, optional The number of frequency samples. Default is 300. spacing: str, optional The spacing of the frequencies. Either 'log' or 'linear', default is 'log'. include_quasistatic: bool, optional Include zero frequency. Default is False. omega_min, omega_max: float, optional Minimum and maximum angular frequencies included (DC notwithstanding). Default to :math:`2\pi\times 10^{-2}/\tau` and :math:`2\pi\times 10^{+1}/\Delta t_{\mathrm{min}}`. Returns ------- omega: ndarray The angular frequencies. """ xspace = np.geomspace if spacing == 'log' else np.linspace omega_min = 2*np.pi*1e-2/pulse.tau if omega_min is None else omega_min omega_max = 2*np.pi*1e+1/pulse.dt.min() if omega_max is None else omega_max omega = xspace(omega_min, omega_max, n_samples - include_quasistatic) if include_quasistatic: return np.insert(omega, 0, 0) return omega
[docs]def hash_array_along_axis(arr: ndarray, axis: int = 0) -> List[int]: """Return the hashes of arr along the first axis""" return [hash(arr.tobytes()) for arr in np.swapaxes(arr, 0, axis)]
[docs]def all_array_equal(it: Iterable) -> bool: """ Return ``True`` if all array elements of ``it`` are equal by hashing the bytes representation of each array. Note that this is not thread-proof. """ return len(set(hash(i.tobytes()) for i in it)) == 1
[docs]def progressbar(iterable: Iterable, *args, **kwargs): """ Progress bar for loops. Uses tqdm. Usage:: for i in progressbar(range(10)): do_something() """ return _tqdm(iterable, *args, **kwargs)
def progressbar_range(*args, show_progressbar: bool = True, **kwargs): """Wrapper for range() that shows a progressbar dependent on a kwarg. Parameters ---------- *args : Positional arguments passed through to :func:`range`. show_progressbar: bool, optional Return a range iterator with or without a progressbar. **kwargs : Keyword arguments passed through to :func:`progressbar`. Returns ------- it: Iterator Range iterator dressed with a progressbar if ``show_progressbar=True``. """ return progressbar(range(*args), disable=kwargs.pop('disable', not show_progressbar), **kwargs) class CalculationError(Exception): """Indicates a quantity could not be computed.""" def __init__(self, message: str) -> None: super().__init__(message)