Source code for graphix.sim.tensornet

import string
from copy import deepcopy

import numpy as np
import quimb.tensor as qtn
from quimb.tensor import Tensor, TensorNetwork

from graphix.clifford import CLIFFORD, CLIFFORD_CONJ, CLIFFORD_MUL
from graphix.ops import Ops, States


[docs]class TensorNetworkBackend: """Tensor Network Simulator for MBQC Executes the measurement pattern using TN expression of graph states. """
[docs] def __init__(self, pattern, graph_prep="auto", **kwargs): """ Parameters ---------- pattern : graphix.Pattern graph_prep : str 'parallel' : Faster method for preparing a graph state. The expression of a graph state can be obtained from the graph geometry. See https://journals.aps.org/pra/abstract/10.1103/PhysRevA.76.052315 for detail calculation. Note that 'N' and 'E' commands in the measurement pattern are ignored. 'sequential' : Sequentially execute N and E commands, strictly following the measurement pattern. In this strategy, All N and E commands executed sequentially. 'auto'(default) : Automatically select a preparation strategy based on the max degree of a graph **kwargs : Additional keyword args to be passed to quimb.tensor.TensorNetwork. """ self.pattern = pattern self.output_nodes = pattern.output_nodes self.results = deepcopy(pattern.results) if graph_prep in ["parallel", "sequential"]: self.graph_prep = graph_prep elif graph_prep == "opt": self.graph_prep = "parallel" print(f"graph preparation strategy '{graph_prep}' is deprecated and will be replaced by 'parallel'") elif graph_prep == "auto": max_degree = pattern.get_max_degree() if max_degree > 5: self.graph_prep = "sequential" else: self.graph_prep = "parallel" else: raise ValueError(f"Invalid graph preparation strategy: {graph_prep}") if self.graph_prep == "parallel": if not pattern.is_standard(): raise ValueError("parallel preparation strategy does not support not-standardized pattern") nodes, edges = pattern.get_graph() self.state = MBQCTensorNet( graph_nodes=nodes, graph_edges=edges, default_output_nodes=pattern.output_nodes, **kwargs, ) elif self.graph_prep == "sequential": self.state = MBQCTensorNet(default_output_nodes=pattern.output_nodes, **kwargs) self._decomposed_cz = _get_decomposed_cz() self._isolated_nodes = pattern.get_isolated_nodes()
[docs] def add_nodes(self, nodes): """Add nodes to the network Parameters ---------- nodes : iterator of int index set of the new nodes. """ if self.graph_prep == "sequential": self.state.add_qubits(nodes) elif self.graph_prep == "opt": pass
[docs] def entangle_nodes(self, edge): """Make entanglement between nodes specified by edge. Parameters ---------- edge : tuple of int edge specifies two target nodes of the CZ gate. """ if self.graph_prep == "sequential": old_inds = [self.state._dangling[str(node)] for node in edge] tids = self.state._get_tids_from_inds(old_inds, which="any") tensors = [self.state.tensor_map[tid] for tid in tids] new_inds = [gen_str() for _ in range(3)] # retag dummy indices for i in range(2): tensors[i].retag({"Open": "Close"}, inplace=True) self.state._dangling[str(edge[i])] = new_inds[i] CZ_tn = TensorNetwork( [ qtn.Tensor( self._decomposed_cz[0], [new_inds[0], old_inds[0], new_inds[2]], [str(edge[0]), "CZ", "Open"], ), qtn.Tensor( self._decomposed_cz[1], [new_inds[2], new_inds[1], old_inds[1]], [str(edge[1]), "CZ", "Open"], ), ] ) self.state.add_tensor_network(CZ_tn) elif self.graph_prep == "opt": pass
[docs] def measure(self, cmd): """Perform measurement of the node. In the context of tensornetwork, performing measurement equals to applying measurement operator to the tensor. Here, directly contracted with the projected state. Parameters ---------- cmd : list measurement command i.e. ['M', node, plane angle, s_domain, t_domain] """ if cmd[1] in self._isolated_nodes: vector = self.state.get_open_tensor_from_index(cmd[1]) probs = np.abs(vector) ** 2 probs = probs / (np.sum(probs)) result = np.random.choice([0, 1], p=probs) self.results[cmd[1]] = result buffer = 1 / probs[result] ** 0.5 else: # choose the measurement result randomly result = np.random.choice([0, 1]) self.results[cmd[1]] = result buffer = 2**0.5 # extract signals for adaptive angle s_signal = np.sum([self.results[j] for j in cmd[4]]) t_signal = np.sum([self.results[j] for j in cmd[5]]) angle = cmd[3] * np.pi if len(cmd) == 7: vop = cmd[6] else: vop = 0 if int(s_signal % 2) == 1: vop = CLIFFORD_MUL[1, vop] if int(t_signal % 2) == 1: vop = CLIFFORD_MUL[3, vop] proj_vec = proj_basis(angle, vop=vop, plane=cmd[2], choice=result) # buffer is necessary for maintaing the norm invariant proj_vec = proj_vec * buffer self.state.measure_single(cmd[1], basis=proj_vec)
[docs] def correct_byproduct(self, cmd): """Perform byproduct correction. Parameters ---------- cmd : list Byproduct command i.e. ['X' or 'Z', node, signal_domain] """ if np.mod(np.sum([self.results[j] for j in cmd[2]]), 2) == 1: if cmd[0] == "X": self.state.evolve_single(cmd[1], Ops.x, "X") elif cmd[0] == "Z": self.state.evolve_single(cmd[1], Ops.z, "Z")
[docs] def apply_clifford(self, cmd): """Apply single-qubit Clifford gate Parameters ---------- cmd : list clifford command. See https://arxiv.org/pdf/2212.11975.pdf for the detail. """ node_op = CLIFFORD[cmd[2]] self.state.evolve_single(cmd[1], node_op, "C")
def finalize(self): pass
class MBQCTensorNet(TensorNetwork): """Tensor Network Simulator interface for MBQC patterns, using quimb.tensor.core.TensorNetwork.""" def __init__( self, graph_nodes=None, graph_edges=None, default_output_nodes=None, ts=[], **kwargs, ): """ Initialize MBQCTensorNet. Parameters ---------- graph_nodes (optional): list of int node indices of the graph state. graph_edges (optional) : list of tuple of int edge indices of the graph state. default_output_nodes : list of int output node indices at the end of MBQC operations, if known in advance. ts (optional): quimb.tensor.core.TensorNetwork or empty list optional initial state. """ if isinstance(ts, MBQCTensorNet): super().__init__(ts=ts, **kwargs) self._dangling = ts._dangling self.default_output_nodes = default_output_nodes else: super().__init__(ts=ts, **kwargs) self._dangling = dict() self.default_output_nodes = default_output_nodes # prepare the graph state if graph_nodes and graph_edges are given if graph_nodes is not None and graph_edges is not None: self.set_graph_state(graph_nodes, graph_edges) def get_open_tensor_from_index(self, index): """Get tensor specified by node index. The tensor has a dangling edge. Parameters ---------- index : str node index Returns ------- numpy.ndarray : Specified tensor """ if isinstance(index, int): index = str(index) assert isinstance(index, str) tags = [index, "Open"] tid = list(self._get_tids_from_tags(tags, which="all"))[0] tensor = self.tensor_map[tid] return tensor.data def add_qubit(self, index, state="plus"): """Add a single qubit to the network. Parameters ---------- index : int index of the new qubit. state (optional): str or 2-element np.ndarray initial state of the new qubit. "plus", "minus", "zero", "one", "iplus", "iminus", or 1*2 np.ndarray (arbitrary state). """ ind = gen_str() tag = str(index) if state == "plus": vec = States.plus elif state == "minus": vec = States.minus elif state == "zero": vec = States.zero elif state == "one": vec = States.one elif state == "iplus": vec = States.iplus elif state == "iminus": vec = States.iminus else: assert state.shape == (2,), "state must be 2-element np.ndarray" assert np.isclose(np.linalg.norm(state), 1), "state must be normalized" vec = state tsr = qtn.Tensor(vec, [ind], [tag, "Open"]) self.add_tensor(tsr) self._dangling[tag] = ind def evolve_single(self, index, arr, label="U"): """Apply single-qubit operator to a qubit with the given index. Parameters ---------- index : int qubit index. arr : 2*2 numpy.ndarray single-qubit operator. label (optional): str label for the gate. """ old_ind = self._dangling[str(index)] tid = list(self._get_tids_from_inds(old_ind)) tensor = self.tensor_map[tid[0]] new_ind = gen_str() tensor.retag({"Open": "Close"}, inplace=True) node_ts = qtn.Tensor( arr, [new_ind, old_ind], [str(index), label, "Open"], ) self._dangling[str(index)] = new_ind self.add_tensor(node_ts) def add_qubits(self, indices, states="plus"): """Add qubits to the network Parameters ---------- indices : iterator of int indices of the new qubits. states (optional): str or 2*2 numpy.ndarray or list initial states of the new qubits. "plus", "minus", "zero", "one", "iplus", "iminus", or 1*2 np.ndarray (arbitrary state). list of the above, to specify the initial state of each qubit. """ if type(states) != list: states = [states] * len(indices) for i, ind in enumerate(indices): self.add_qubit(ind, state=states[i]) def measure_single(self, index, basis="Z", bypass_probability_calculation=True, outcome=None): """Measure a node in specified basis. Note this does not perform the partial trace. Parameters ---------- index : int index of the node to be measured. basis : str or np.ndarray default "Z". measurement basis, "Z" or "X" or "Y" for Pauli basis measurements. 1*2 numpy.ndarray for arbitrary measurement bases. bypass_probability_calculation : bool default True. if True, skip the calculation of the probability of the measurement result and use equal probability for each result. if False, calculate the probability of the measurement result from the state. outcome : int (0 or 1) User-chosen measurement result, giving the outcome of (-1)^{outcome}. Returns ------- int measurement result. """ if bypass_probability_calculation: if outcome is not None: result = outcome else: result = np.random.choice([0, 1]) # Basis state to be projected if type(basis) == np.ndarray: if outcome is not None: raise Warning("Measurement outcome is chosen but the basis state was given.") proj_vec = basis elif basis == "Z" and result == 0: proj_vec = States.zero elif basis == "Z" and result == 1: proj_vec = States.one elif basis == "X" and result == 0: proj_vec = States.plus elif basis == "X" and result == 1: proj_vec = States.minus elif basis == "Y" and result == 0: proj_vec = States.iplus elif basis == "Y" and result == 1: proj_vec = States.iminus else: raise ValueError("Invalid measurement basis.") else: raise NotImplementedError("Measurement probability calculation not implemented.") old_ind = self._dangling[str(index)] proj_ts = Tensor(proj_vec, [old_ind], [str(index), "M", "Close", "ancilla"]).H # add the tensor to the network tid = list(self._get_tids_from_inds(old_ind)) tensor = self.tensor_map[tid[0]] tensor.retag({"Open": "Close"}, inplace=True) self.add_tensor(proj_ts) return result def set_graph_state(self, nodes, edges): """Prepare the graph state without directly applying CZ gates. Parameters ---------- nodes : iterator of int set of the nodes edges : iterator of tuple set of the edges .. seealso:: :meth:`~graphix.sim.tensornet.TensorNetworkBackend.__init__()` """ ind_dict = dict() vec_dict = dict() for edge in edges: for node in edge: if node not in ind_dict.keys(): ind = gen_str() self._dangling[str(node)] = ind ind_dict[node] = [ind] vec_dict[node] = [] greater = edge[0] > edge[1] # true for 1/0, false for +/- vec_dict[edge[0]].append(greater) vec_dict[edge[1]].append(not greater) ind = gen_str() ind_dict[edge[0]].append(ind) ind_dict[edge[1]].append(ind) for node in nodes: if node not in ind_dict.keys(): ind = gen_str() self._dangling[str(node)] = ind self.add_tensor(Tensor(States.plus, [ind], [str(node), "Open"])) continue dim_tensor = len(vec_dict[node]) tensor = np.array( [ outer_product([States.vec[0 + 2 * vec_dict[node][i]] for i in range(dim_tensor)]), outer_product([States.vec[1 + 2 * vec_dict[node][i]] for i in range(dim_tensor)]), ] ) * 2 ** (dim_tensor / 4 - 1.0 / 2) self.add_tensor(Tensor(tensor, ind_dict[node], [str(node), "Open"])) def get_basis_coefficient(self, basis, normalize=True, indices=None, **kwagrs): """Calculate the coefficient of a given computational basis. Parameters ---------- basis : int or str computational basis expressed in binary (str) or integer, e.g. 101 or 5. normalize (optional): bool if True, normalize the coefficient by the norm of the entire state. indices (optional): list of int target qubit indices to compute the coefficients, default is the MBQC output nodes (self.default_output_nodes). Returns ------- coef : complex coefficient """ if indices == None: indices = self.default_output_nodes if isinstance(basis, str): basis = int(basis, 2) tn = self.copy() # prepare projected state for i in range(len(indices)): node = str(indices[i]) exp = len(indices) - i - 1 if (basis // 2**exp) == 1: state_out = States.one # project onto |1> basis -= 2**exp else: state_out = States.zero # project onto |0> tensor = Tensor(state_out, [tn._dangling[node]], [node, f"qubit {i}", "Close"]) # retag old_ind = tn._dangling[node] tid = list(tn._get_tids_from_inds(old_ind))[0] tn.tensor_map[tid].retag({"Open": "Close"}) tn.add_tensor(tensor) # contraction tn_simplified = tn.full_simplify("ADCR") coef = tn_simplified.contract(output_inds=[], **kwagrs) if normalize: norm = self.get_norm() return coef / norm else: return coef def get_basis_amplitude(self, basis, **kwagrs): """Calculate the probability amplitude of the specified computational basis state. Parameters ---------- basis : int or str computational basis expressed in binary (str) or integer, e.g. 101 or 5. Returns ------- float : the probability amplitude of the specified state. """ if isinstance(basis, str): basis = int(basis, 2) coef = self.get_basis_coefficient(basis, **kwagrs) return abs(coef) ** 2 def to_statevector(self, indices=None, **kwagrs): """Retrieve the statevector from the tensornetwork. This method tends to be slow however we plan to parallelize this. Parameters ---------- indices (optional): list of int target qubit indices. Default is the MBQC output nodes (self.default_output_nodes). Returns ------- numpy.ndarray : statevector """ if indices == None: n_qubit = len(self.default_output_nodes) else: n_qubit = len(indices) statevec = np.zeros(2**n_qubit, np.complex128) for i in range(len(statevec)): statevec[i] = self.get_basis_coefficient(i, normalize=False, indices=indices, **kwagrs) return statevec / np.linalg.norm(statevec) def get_norm(self, **kwagrs): """Calculate the norm of the state. Returns ------- float : norm of the state """ tn_cp1 = self.copy() tn_cp2 = tn_cp1.conj() tn = TensorNetwork([tn_cp1, tn_cp2]) tn_simplified = tn.full_simplify("ADCR") norm = abs(tn_simplified.contract(output_inds=[], **kwagrs)) ** 0.5 return norm def expectation_value(self, op, qubit_indices, output_node_indices=None, **kwagrs): """Calculate expectation value of the given operator, Parameters ---------- op : numpy.ndarray single- or multi-qubit Hermitian operator qubit_indices : list of int Applied positions of **logical** qubits. output_node_indices (optional): list of int Indices of nodes in the entire TN, that remain unmeasured after MBQC operations. Default is the output nodes specified in measurement pattern (self.default_output_nodes). Returns ------- float : Expectation value """ if output_node_indices == None: if self.default_output_nodes == None: raise ValueError("output_nodes is not set.") else: target_nodes = [self.default_output_nodes[ind] for ind in qubit_indices] out_inds = self.default_output_nodes else: target_nodes = [output_node_indices[ind] for ind in qubit_indices] out_inds = output_node_indices op_dim = len(qubit_indices) op = op.reshape([2 for _ in range(2 * op_dim)]) new_ind_left = [gen_str() for _ in range(op_dim)] new_ind_right = [gen_str() for _ in range(op_dim)] tn_cp_left = self.copy() op_ts = Tensor(op, new_ind_right + new_ind_left, ["Expectation Op.", "Close"]) tn_cp_right = tn_cp_left.conj() # reindex & retag for node in out_inds: old_ind = tn_cp_left._dangling[str(node)] tid_left = list(tn_cp_left._get_tids_from_inds(old_ind))[0] tid_right = list(tn_cp_right._get_tids_from_inds(old_ind))[0] if node in target_nodes: tn_cp_left.tensor_map[tid_left].reindex({old_ind: new_ind_left[target_nodes.index(node)]}, inplace=True) tn_cp_right.tensor_map[tid_right].reindex( {old_ind: new_ind_right[target_nodes.index(node)]}, inplace=True ) tn_cp_left.tensor_map[tid_left].retag({"Open": "Close"}) tn_cp_right.tensor_map[tid_right].retag({"Open": "Close"}) tn_cp_left.add([op_ts, tn_cp_right]) # contraction tn_cp_left = tn_cp_left.full_simplify("ADCR") exp_val = tn_cp_left.contract(output_inds=[], **kwagrs) norm = self.get_norm(**kwagrs) return exp_val / norm**2 def evolve(self, operator, qubit_indices, decompose=True, **kwagrs): """Apply an arbitrary operator to the state. Parameters ---------- operator : numpy.ndarray operator. qubit_indices : list of int Applied positions of **logical** qubits. decompose : bool, optional default True whether a given operator will be decomposed or not. If True, operator is decomposed into Matrix Product Operator(MPO) """ if len(operator.shape) != len(qubit_indices) * 2: shape = [2 for _ in range(2 * len(qubit_indices))] operator = operator.reshape(shape) # operator indices node_indices = [self.default_output_nodes[index] for index in qubit_indices] old_ind_list = [self._dangling[str(index)] for index in node_indices] new_ind_list = [gen_str() for _ in range(len(node_indices))] for i in range(len(node_indices)): self._dangling[str(node_indices[i])] = new_ind_list[i] ts = Tensor( operator, new_ind_list + old_ind_list, [str(index) for index in node_indices], ) if decompose: # decompose tensor into Matrix Product Operator(MPO) tensors = [] bond_inds = {0: None} for i in range(len(node_indices) - 1): bond_inds[i + 1] = gen_str() left_inds = [new_ind_list[i], old_ind_list[i]] if bond_inds[i]: left_inds.append(bond_inds[i]) unit_tensor, ts = ts.split(left_inds=left_inds, bond_ind=bond_inds[i + 1], **kwagrs) tensors.append(unit_tensor) tensors.append(ts) op_tensor = TensorNetwork(tensors) else: op_tensor = ts self.add(op_tensor) def copy(self, deep=False): """Returns the copy of this object. Parameters ---------- deep : bool, optional Defaults to False. Whether to copy the underlying data as well. Returns ------- TensorNetworkBackend : duplicated object """ if deep: return deepcopy(self) else: return self.__class__(ts=self) def _get_decomposed_cz(): """Returns the decomposed cz tensors. This is an internal method. CZ gate can be decomposed into two 3-rank tensors(Schmidt rank = 2). Decomposing into low-rank tensors is important preprocessing for the optimal contraction path searching problem. So, in this backend, the DECOMPOSED_CZ gate is applied instead of the original CZ gate. Decomposing CZ gate output output | | | | -------- SVD --- --- | CZ | --> |L|----|R| -------- --- --- | | | | input input 4-rank x1 3-rank x2 """ cz_ts = Tensor( Ops.cz.reshape((2, 2, 2, 2)).astype(np.float64), ["O1", "O2", "I1", "I2"], ["CZ"], ) decomposed_cz = cz_ts.split(left_inds=["O1", "I1"], right_inds=["O2", "I2"], max_bond=4) return [ decomposed_cz.tensors[0].data, decomposed_cz.tensors[1].data, ]
[docs]def gen_str(): """Generate dummy string for einsum.""" result = qtn.rand_uuid() return result
[docs]def proj_basis(angle, vop, plane, choice): """the projected statevector. Parameters ---------- angle : float measurement angle vop : int CLIFFORD index plane : str measurement plane choice : int measurement result Returns ------- numpy.ndarray : projected state """ if plane == "XY": vec = States.vec[0 + choice] rotU = Ops.Rz(angle) elif plane == "YZ": vec = States.vec[4 + choice] rotU = Ops.Rx(angle) elif plane == "XZ": vec = States.vec[0 + choice] rotU = Ops.Ry(-angle) vec = np.matmul(rotU, vec) vec = np.matmul(CLIFFORD[CLIFFORD_CONJ[vop]], vec) return vec
[docs]def outer_product(vectors): """outer product of the given vectors Parameters ---------- vectors : list of vector vectors Returns ------- numpy.ndarray : tensor object. """ subscripts = string.ascii_letters[: len(vectors)] subscripts = ",".join(subscripts) + "->" + subscripts return np.einsum(subscripts, *vectors)