Skip to main content

View on GitHub

Open this notebook in GitHub to run it yourself

📊 What is Network Traffic Optimization?

Network Traffic Optimization is a critical network management used in Telecommunication: Input: Weighted directed graph and a set of demands. Goal: Satisfy the demands, while minimizing the latency, without violating the two constraints. 1. Capacity per edge (unit capacity): dZ(d,e)1\sum_d Z(d,e) \leq 1 2. Flow conservation for each demand and node The following notebook demonstrates a solution to the network traffic optimization problem, applying the QAOA algorithm.

1️⃣ Setup

import math

import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
from scipy.optimize import Bounds, LinearConstraint, milp, minimize
from tqdm import tqdm

from classiq import *
from classiq.execution import ExecutionPreferences, ExecutionSession

CLASSIQ = {
    "lime": "#D7F75B",
    "gray": "#C9CBC0",
    "black": "#191919",
    "teal": "#0C7489",
    "dgray": "#282828",
    "offw": "#F4F9E9",
    "cyan": "#109DA3",
    "pink": "#F43764",
}

2️⃣ Problem Definition

We define the graph by introducing six vertices (AFA-F), V, and connecting edges, E. Three demands are introduced, each with start and terminal nodes; our goal is to find the minimal-latency disjoint routes that connect the start and terminal nodes of all the demands. Disjoint routes do not share a node at each time-step.
V = ["A", "B", "C", "D", "E", "F"]
E_with_lat = [
    ("A", "B", 2),
    ("B", "E", 2),
    ("A", "C", 3),
    ("C", "E", 1),
    ("B", "D", 1),
    ("D", "F", 2),
    ("C", "D", 2),
    ("E", "F", 1),
]
demands = [
    {"name": "d1", "s": "A", "t": "E"},
    {"name": "d2", "s": "B", "t": "F"},
    {"name": "d3", "s": "C", "t": "F"},
]

E = [(u, v) for (u, v, _) in E_with_lat]
lat_vec = np.array([w for (_, _, w) in E_with_lat], dtype=float)
lat = {(u, v): w for (u, v, w) in E_with_lat}
edge_ids = [f"e{i+1}" for i in range(len(E))]
graph = nx.DiGraph()
graph.add_nodes_from(V)
graph.add_weighted_edges_from(E_with_lat)
E = graph.edges()
D = len(demands)
m = len(E)
print(f"Network: {len(V)} nodes, {m} edges, {D} demands")
Output:

Network: 6 nodes, 8 edges, 3 demands
  

Z = np.zeros((D, m))

# The mixer Hamiltonian is effectively X/2 and has eigenvalues -0.5 and +

0.

5. So the difference between minimum and maximum eigenvalues is exactly 

1.
# This can be rescaled by a global scaling parameter.
GlobalScalingParameter = 1

# The constraint Hamiltonian has the property that a minimal constraint violation is 1 and no constraint violation is 

0.
# We wish to normalise the constraint Hamiltonian relative to the total cost Hamiltonian such that the constraint violation will be 1~2 x larger than the maximal difference between total cost values.
RelativeConstraintNormalisation = 1

# The cost Hamiltonian should be similar in eigenvalue difference to the mixer Hamiltonian and should be normalised to about 

1.
# To find the exact normalization requires solving this NP hard problem, so we always use an approximation.
# Since this is approximate, there is a relative scaling parameter we can tweak
RelativeCostNormalisation = 1 / 5

TotalCostNormalisation = GlobalScalingParameter * RelativeCostNormalisation
TotalConstraintNormalisation = RelativeConstraintNormalisation * TotalCostNormalisation

# Normalize latencies
min_solution_guess = (
    8  # can be calculated with Dijkstra when removing the single assignment constraint
)
max_solution_guess = 11  # any guess that fits the constraints will work

lat_normalized = {}
for e, w in lat.items():
    lat_normalized[e] = (
        w * TotalCostNormalisation / (max_solution_guess - min_solution_guess)
    )


c = np.tile(lat_vec, D)

# Variable bounds: 0 <= Z[d,e] <= 1 (binary)
lb = np.zeros(D * m)
ub = np.ones(D * m)
integrality = np.ones(D * m, dtype=int)


# Helper: map (d,e) -> flat index
def fidx(d_idx, e_idx):
    return d_idx * m + e_idx


lin_constraints = []

# (1) Capacity per edge (unit capacity): sum_d Z[d,e] <= 1
A_cap = np.zeros((m, D * m))
for e_idx in range(m):
    for d_idx in range(D):
        A_cap[e_idx, fidx(d_idx, e_idx)] = 1.0
cap_lb = -np.inf * np.ones(m)
cap_ub = np.ones(m)
lin_constraints.append(LinearConstraint(A_cap, cap_lb, cap_ub))


# (2) Flow conservation for each demand and node:
# For each demand d and node v: sum_out Z[d,e] - sum_in Z[d,e] = b_{d,v}
def incidence_row_for(d_idx, v):
    row = np.zeros(D * m)
    for e_idx, (u, v2) in enumerate(E):
        if u == v:  # outgoing
            row[fidx(d_idx, e_idx)] += 1.0
        if v2 == v:  # incoming
            row[fidx(d_idx, e_idx)] -= 1.0
    return row


A_flow_rows = []
b_list = []
for d_idx, d in enumerate(demands):
    for v in V:
        b = 0.0
        if v == d["s"]:
            b = 1.0
        elif v == d["t"]:
            b = -1.0
        A_flow_rows.append(incidence_row_for(d_idx, v))
        b_list.append(b)
A_flow = np.vstack(A_flow_rows)
b_vec = np.array(b_list)
lin_constraints.append(LinearConstraint(A_flow, b_vec, b_vec))

Visualize

pos = {
    "A": (0, 0.6),
    "B": (1, 1.1),
    "C": (1, 0.1),
    "D": (2, 0.6),
    "E": (2, 1.1),
    "F": (3, 0.6),
}
plt.figure(figsize=(10, 5))
nx.draw(
    graph,
    pos,
    with_labels=True,
    node_size=1200,
    node_color=CLASSIQ["lime"],
    edge_color=CLASSIQ["pink"],
    font_size=14,
    arrows=True,
)
nx.draw_networkx_edge_labels(
    graph, pos, edge_labels={(u, v): lat[(u, v)] for (u, v) in E}
)
plt.title(
    f"Network Topology: Directed Graph with {len(V)} nodes, {m} edges and {D} demands",
    fontsize=16,
)
plt.show()
output

3️⃣ Classical Solution

# ---------------------------
# Solve MILP
# ---------------------------
res = milp(
    c=c,
    integrality=integrality,
    bounds=Bounds(lb, ub),
    constraints=lin_constraints,
    options={"disp": False},
)

# Convert back to 2D Z[d,e]
Z = np.round(res.x).astype(int).reshape(D, m)

Visualize

def visualize_solution(Z_matrix, title="Routing Solution"):
    colors_map = [CLASSIQ["pink"], CLASSIQ["cyan"], CLASSIQ["teal"]]
    edge_to_demand = {}
    for d_idx in range(D):
        for e_idx, (u, v) in enumerate(E):
            if Z_matrix[d_idx, e_idx] == 1:
                edge_to_demand[(u, v)] = d_idx
    edge_colors = [
        (
            colors_map[edge_to_demand[(u, v)]]
            if (u, v) in edge_to_demand
            else CLASSIQ["pink"]
        )
        for u, v in E
    ]
    edge_widths = [4.0 if (u, v) in edge_to_demand else 1.0 for u, v in E]
    plt.figure(figsize=(10, 5))
    nx.draw(
        graph,
        pos,
        with_labels=True,
        node_size=1200,
        node_color=CLASSIQ["lime"],
        font_size=14,
        edge_color=edge_colors,
        width=edge_widths,
        arrows=True,
    )
    nx.draw_networkx_edge_labels(
        graph, pos, edge_labels={(u, v): lat[(u, v)] for (u, v) in E}
    )
    from matplotlib.patches import Patch

    plt.legend(
        handles=[
            Patch(facecolor=colors_map[i], label=demands[i]["name"]) for i in range(D)
        ],
        loc="upper right",
    )
    plt.title(title, fontsize=16)
    plt.show()


visualize_solution(Z, "Classical MILP Solution")
output
# ---------------------------
# Decode / present results
# ---------------------------
# Table: Z with demand names and edge IDs
Z_df = pd.DataFrame(
    Z, index=[f"{d['name']}({d['s']}{d['t']})" for d in demands], columns=edge_ids
)

# Table: chosen edges per demand + latency sums
rows = []
for d_idx, d in enumerate(demands):
    chosen_edges = [(u, v) for e_idx, (u, v) in enumerate(E) if Z[d_idx, e_idx] == 1]
    chosen_labels = [f"{u}{v}" for (u, v) in chosen_edges]
    latency_sum = sum(lat[e] for e in chosen_edges)
    rows.append(
        {
            "Demand": f"{d['name']} ({d['s']}{d['t']})",
            "Chosen edges": ", ".join(chosen_labels) if chosen_labels else "(none)",
            "Latency sum": latency_sum,
        }
    )
df_chosen = pd.DataFrame(rows)


# Try to reconstruct s→t sequence (simple walk)
def reconstruct_path(chosen_edges_list, s, t):
    nxt = {}
    for u, v in chosen_edges_list:
        nxt[u] = v
    path = [s]
    visited = set([s])
    while path[-1] != t and path[-1] in nxt:
        nxt_node = nxt[path[-1]]
        if nxt_node in visited:
            break
        path.append(nxt_node)
        visited.add(nxt_node)
    return path


seq_rows = []
for d_idx, d in enumerate(demands):
    chosen = [(u, v) for e_idx, (u, v) in enumerate(E) if Z[d_idx, e_idx] == 1]
    seq = reconstruct_path(chosen, d["s"], d["t"])
    seq_rows.append(
        {
            "Demand": f"{d['name']} ({d['s']}{d['t']})",
            "Sequence": "→".join(seq),
            "Is complete s→t?": (
                len(seq) >= 2 and seq[0] == d["s"] and seq[-1] == d["t"]
            ),
        }
    )
df_sequences = pd.DataFrame(seq_rows)

# Display
print("Z (2D) solution matrix — rows=demand, cols=edge\n", Z_df)
print("Chosen edges & per-demand latency\n", df_chosen)
print("Reconstructed s→t sequences (validity check)\n", df_sequences)
Output:

Z (2D) solution matrix — rows=demand, cols=edge
            e1  e2  e3  e4  e5  e6  e7  e8
  d1(A→E)   1   0   1   0   0   0   0   0
  d2(B→F)   0   0   0   1   0   0   1   0
  d3(C→F)   0   0   0   0   1   0   0   1
  Chosen edges & per-demand latency
        Demand Chosen edges  Latency sum
  0  d1 (A→E)     A→B, B→E            4
  1  d2 (B→F)     B→D, D→F            3
  2  d3 (C→F)     C→E, E→F            2
  Reconstructed s→t sequences (validity check)
        Demand Sequence  Is complete s→t?
  0  d1 (A→E)    A→B→E              True
  1  d2 (B→F)    B→D→F              True
  2  d3 (C→F)    C→E→F              True
  

4️⃣ Quantum Solution with QAOA

# Objective: sum_d sum_e lat_e * Z[d,e]


def objective_func(assigned_z):
    return sum(
        (lat_normalized[e] * assigned_z[fidx(d_idx, e_idx)])
        for d_idx, d in enumerate(demands)
        for e_idx, e in enumerate(graph.edges())
    )


def index_of_edge(u, v):
    for e_idx, edge in enumerate(graph.edges()):
        if edge[0] == u and edge[1] == v:
            return e_idx
    raise AssertionError("Edge not found")


def constraint_flow_conservation(assigned_z):
    total_flow = 0
    for d_idx, d in enumerate(demands):
        for v in V:
            out_sum = sum(
                assigned_z[fidx(d_idx, index_of_edge(v, v_out))]
                for v_out in graph.successors(v)
            )
            in_sum = sum(
                assigned_z[fidx(d_idx, index_of_edge(v_in, v))]
                for v_in in graph.predecessors(v)
            )
            source_correction = destination_correction = 0
            if d["s"] == v:
                source_correction = 1
            if d["t"] == v:
                destination_correction = 1
            node_demand_flow = (
                in_sum - out_sum + source_correction - destination_correction
            )
            total_flow += node_demand_flow**2
    return TotalConstraintNormalisation * total_flow


def constraint_single_assignment(assigned_z):

    def inverse(bit):
        return 1 - bit

    more_than_a_single_assignment = 0

    for e_idx, e in enumerate(graph.edges()):
        assignments_of_e = [
            assigned_z[fidx(d_idx, e_idx)] for d_idx, d in enumerate(demands)
        ]
        all_assignments = math.prod(assignments_of_e)
        two_assignments = 0
        for idx, current_assignment in enumerate(assignments_of_e):
            rest_of_assignments = [
                other_e
                for other_idx, other_e in enumerate(assignments_of_e)
                if idx != other_idx
            ]
            rest_of_assignments_off = math.prod(rest_of_assignments)
            two_assignments += inverse(current_assignment) * rest_of_assignments_off
        more_than_a_single_assignment += all_assignments + two_assignments
    return TotalConstraintNormalisation * more_than_a_single_assignment


def cost_hamiltonian(assigned_Z):
    objective_weight = 1
    flow_conservation_weight = 1
    single_assignment_weight = 1
    return (
        objective_weight * objective_func(assigned_Z)
        + flow_conservation_weight * constraint_flow_conservation(assigned_Z)
        + single_assignment_weight * constraint_single_assignment(assigned_Z)
    )

NUM_LAYERS = 5


@qfunc
def mixer_layer(beta: CReal, qba: QArray[QBit]):
    apply_to_all(lambda q: RX(GlobalScalingParameter * beta, q), qba),


@qfunc
def main(
    params: CArray[CReal, 2 * NUM_LAYERS],
    z: Output[QArray[QBit, m * D]],
) -> None:
    allocate(z)
    hadamard_transform(z)

    repeat(
        count=NUM_LAYERS,
        iteration=lambda i: (
            phase(cost_hamiltonian(z), params[2 * i]),
            mixer_layer(params[2 * i + 1], z),
        ),
    )


qprog = synthesize(main)

NUM_SHOTS = 10000
es = ExecutionSession(
    qprog, execution_preferences=ExecutionPreferences(num_shots=NUM_SHOTS)
)


def initial_qaoa_params(NUM_LAYERS) -> np.ndarray:
    initial_gammas = math.pi * np.linspace(
        1 / (2 * NUM_LAYERS), 1 - 1 / (2 * NUM_LAYERS), NUM_LAYERS
    )
    initial_betas = math.pi * np.linspace(
        1 - 1 / (2 * NUM_LAYERS), 1 / (2 * NUM_LAYERS), NUM_LAYERS
    )

    initial_params = []

    for i in range(NUM_LAYERS):
        initial_params.append(initial_gammas[i])
        initial_params.append(initial_betas[i])

    return np.array(initial_params)


initial_params = initial_qaoa_params(NUM_LAYERS)

cost_func = lambda state: cost_hamiltonian(state["z"])


def estimate_cost_func(params):
    objective_val = es.estimate_cost(cost_func, {"params": params.tolist()})
    objective_values.append(objective_val)
    # print(objective_val)
    return objective_val


# Record the steps of the optimization
intermediate_params = []
objective_values = []


# Define the callback function to store the intermediate steps
def callback(xk):
    intermediate_params.append(xk)
    objective_values.append(es.estimate_cost(cost_func, {"params": xk.tolist()}))

MAX_ITERATIONS = 10
with tqdm(total=MAX_ITERATIONS, desc="Optimization Progress", leave=True) as pbar:

    def progress_bar(xk: np.ndarray) -> None:
        pbar.update(1)  # increment progress bar

    optimization_res = minimize(
        estimate_cost_func,
        x0=initial_params,
        method="COBYLA",
        # callback=callback,
        options={"maxiter": MAX_ITERATIONS},
        callback=progress_bar,
    )

res = es.sample({"params": optimization_res.x.tolist()})
print(f"Optimized parameters: {optimization_res.x.tolist()}")
Output:

Optimization Progress: 11it [03:06, 16.93s/it]
  

Output:

Optimized parameters: [0.3141592653589793, 2.827433388230814, 0.942477796076938, 2.199114857512855, 1.5707963267948966, 1.5707963267948966, 2.199114857512855, 0.9424777960769377, 2.827433388230814, 0.3141592653589793]
  

5️⃣ Results Analysis

def check_validity(assigned_z: list[int]) -> bool:
    if constraint_flow_conservation(assigned_z) != 0:
        return False
    if constraint_single_assignment(assigned_z) != 0:
        return False
    return True

sorted_counts = sorted(res.parsed_counts, key=lambda pc: pc.shots, reverse=True)
count = 0


def print_res(sampled):
    color = "92m"
    assigned_z = sampled.state["z"]
    if not check_validity(assigned_z):
        color = "91m"
    print(
        f"\033[{color}solution={assigned_z} probability={sampled.shots/NUM_SHOTS} cost={cost_hamiltonian(assigned_z)}\033[0m, objective={objective_func(assigned_z)/TotalCostNormalisation * (max_solution_guess-min_solution_guess)}"
    )


valid_solutions = []
for sampled in sorted_counts[:15]:
    print_res(sampled)
for idx, sampled in enumerate(sorted_counts):
    if check_validity(sampled.state["z"]):
        valid_solutions.append(idx)
        count += sampled.shots / NUM_SHOTS
        print(f"Valid solution at index {idx}")
print(count)
Output:
solution=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1] probability=0.0049 cost=0.7333333333333334, objective=5.0
  solution=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1] probability=0.0047 cost=0.9333333333333333, objective=2.0
  solution=[1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1] probability=0.0047 cost=0.6, objective=8.999999999999998
  solution=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0] probability=0.0037 cost=1.0, objective=3.0
  solution=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1] probability=0.0035 cost=1.0, objective=3.0
  solution=[0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1] probability=0.0032 cost=0.8, objective=8.999999999999998
  solution=[0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0] probability=0.0031 cost=0.7333333333333333, objective=10.999999999999998
  solution=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 1] probability=0.003 cost=0.9333333333333333, objective=5.0
  solution=[1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1] probability=0.0029 cost=0.8, objective=5.999999999999999
  solution=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0] probability=0.0027 cost=0.8666666666666667, objective=6.999999999999999
  solution=[0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1] probability=0.0027 cost=0.8666666666666667, objective=6.999999999999999
  solution=[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1] probability=0.0026 cost=0.8666666666666667, objective=6.999999999999999
  solution=[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0] probability=0.0026 cost=1.0666666666666667, objective=4.0
  solution=[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1] probability=0.0026 cost=1.2666666666666668, objective=1.0
  solution=[0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0] probability=0.0025 cost=0.8666666666666667, objective=6.999999999999999
  Valid solution at index 2
  Valid solution at index 6
  0.0078
  

Visualize Valid Solutions

for i in valid_solutions[:3]:
    Z_quantum = np.array(sorted_counts[i].state["z"]).reshape(D, m)
    visualize_solution(
        Z_quantum,
        f'Quantum Solution at index {i}\n Objective={objective_func(sorted_counts[i].state["z"])/TotalCostNormalisation * (max_solution_guess-min_solution_guess)}',
    )
output output