Skip to content

Autocallables with Integration Amplitude Loading

View on GitHub
In this Notebook we will go through the implementation of the Integration Amplitude Loading Method for the autocallables based on https://arxiv.org/pdf/2402.05574.pdf and https://arxiv.org/pdf/2012.03819 using classiq platform QMOD language.

Data Definitions

import numpy as np
import scipy

S = 18 # notional value (can be asset initial price)
dt = 1 #in year
NUM_MARKET_DAYS_IN_YEAR = 250

DAILY_SIGMA = 0.0150694 #daily std log return
DAILY_MU = 0.00050963 #daily mean log return

SIGMA = DAILY_SIGMA*np.sqrt(NUM_MARKET_DAYS_IN_YEAR) #annual
MU = DAILY_MU*NUM_MARKET_DAYS_IN_YEAR #annual

b = 0.7 # binary barrier (to check with returns)
K_comp = 1 # K put (to check with returns)
K = K_comp*S # K put (to to use for the payoff)
r= 0.04 #annual risk free rate

K_bin_1 = 1.1 # K binaries (to check with returns)
K_bin_2 = 1.1 # K binaries (to check with returns)
K_bin_norm = [np.log(K_bin_1), np.log(K_bin_2)] # K binaries (to check with log returns)
bin_1_payoff = 2*np.exp(-r*1) # payoff binaries already discounted
bin_2_payoff = 3*np.exp(-r*2) # payoff binaries already discounted


NUM_QUBITS = 2
TIME_STEPS = 3 # 3 years
PRECISION = 2
NUM_QUBITS*TIME_STEPS

Gaussian State preparation

def gaussian_discretization(num_qubits, mu=0, sigma=1, stds_around_mean_to_include=3):
    lower = mu - stds_around_mean_to_include * sigma
    upper = mu + stds_around_mean_to_include * sigma
    num_of_bins = 2**num_qubits
    sample_points = np.linspace(lower, upper, num_of_bins + 1)

    def single_gaussian(x: np.ndarray, _mu: float, _sigma: float) -> np.ndarray:
        cdf = scipy.stats.norm.cdf(x, loc=_mu, scale=_sigma)
        return cdf[1:] - cdf[0:-1]

    non_normalized_pmf = (single_gaussian(sample_points, mu, sigma),)
    real_probs = non_normalized_pmf / np.sum(non_normalized_pmf)
    return sample_points[:-1], real_probs[0].tolist()


grid_points, probabilities = gaussian_discretization(NUM_QUBITS, stds_around_mean_to_include=3)

STEP_X = grid_points[1] - grid_points[0]
MIN_X = grid_points[0]

Rescalings

Compute \(R_T^{max}\) resulting from discretization.

R_T_MAX_PROP = 0

if MU>0 and SIGMA>0:
    R_T_MAX_PROP = TIME_STEPS*(MU*dt + SIGMA*np.sqrt(dt)*(grid_points[-1]))

R_T_MIN_PROP = 0

if MU>0 and SIGMA>0:
    R_T_MIN_PROP = TIME_STEPS*(MU*dt + SIGMA*np.sqrt(dt)*(grid_points[0]))

R_T_MAX_PROP = np.max([np.abs(R_T_MIN_PROP), np.abs(R_T_MAX_PROP)])

R_T_MAX = np.log(np.max([(np.exp(TIME_STEPS*r)*bin_2_payoff)+K, (np.exp(TIME_STEPS*r)*bin_1_payoff)+K, K])/S)
R_T_MAX = max(R_T_MAX_PROP, R_T_MAX)

In two's complement, given \(N\) as number of qubits, we can represent from \(-2^{N-1}\) and \(2^{N-1}-1\).

a = 1/(2**PRECISION)
if R_T_MAX < 1:
    int_places = 1
else:
    int_places = np.ceil(np.log2(np.ceil(R_T_MAX))) + 1
num_norm_factor = ((np.exp(a*(2**(int_places+PRECISION)))-1))
den_norm_factor = (np.exp(a*(2**(int_places+PRECISION-1)+1))) 
norm_factor = S*(num_norm_factor/den_norm_factor)

Compute constant rotations

def compute_constant_rotation(const_payoff):
    b1 = const_payoff*np.exp(r*TIME_STEPS)
    num1 = (b1 + K)*den_norm_factor
    den1 = S*num_norm_factor
    den2 = num_norm_factor
    num2 = 1
    angle = (num1/den1) - (num2/den2)
    return 2*np.arcsin(np.sqrt(angle))
bin_1_payoff_rotation= compute_constant_rotation(bin_1_payoff)
bin_2_payoff_rotation= compute_constant_rotation(bin_2_payoff)
zero_rotation = compute_constant_rotation(0)
bit_T_normalize = (S*np.exp(-r*TIME_STEPS))/den_norm_factor

def postprocessing(x):
    return (x*norm_factor*np.exp(-r*TIME_STEPS)) + bit_T_normalize - (K*np.exp(-r*TIME_STEPS))

Verifications

bin_1_payoff
postprocessing((np.sin((compute_constant_rotation(bin_1_payoff)/2)))**2)
bin_2_payoff
postprocessing((np.sin((compute_constant_rotation(bin_2_payoff)/2)))**2)
postprocessing((np.sin((compute_constant_rotation(0)/2)))**2)

Compute constants for comparisons

b_norm = np.log(b)
K_norm = np.log(K_comp)

Classical Payoff

import pandas as pd
from itertools import product

simulation = pd.DataFrame.from_dict({"quantum_samples": list(range(len(grid_points))) ,"classical_samples": grid_points, "probabilities": probabilities} )

binary_combinations = list(product( range(2**NUM_QUBITS), repeat=TIME_STEPS))
sim = pd.DataFrame(binary_combinations)
new_col = ["time_" +str(i) for i in range(TIME_STEPS)]
sim.columns = new_col
def round_down(a):
    precision_factor = 2 ** (PRECISION)
    return np.floor(a * precision_factor) / precision_factor
def round_factor(a):
    #precision_factor = 2 ** (PRECISION)
    #return np.round(a * precision_factor) / precision_factor
    return floor_factor(a)

def floor_factor(a):
    precision_factor = 2 ** (PRECISION)

    if a >= 0:
        return np.floor(a * precision_factor) / precision_factor
    else:
        return np.ceil(a * precision_factor) / precision_factor
for i in range(TIME_STEPS):
    sim = sim.merge(simulation, left_on="time_" + str(i) , right_on="quantum_samples")
    sim = sim.drop("quantum_samples", axis=1)
    new_col = new_col + ["c_" + str(i), "q_prob_" + str(i)]
    sim.columns  = new_col
    sim["log_ret_val_" + str(i)] = round_factor(MU*dt + np.sqrt(dt)*SIGMA*MIN_X) + round_factor(SIGMA*np.sqrt(dt)*STEP_X) * sim["time_" + str(i)]
    new_col = new_col + ["log_ret_val_" + str(i)]
    sim.columns  = new_col
    if i!=0:
        sim["log_ret_val_" + str(i)] = sim["log_ret_val_" + str(i)]+sim["log_ret_val_" + str(i-1)] 
    sim["ret_val_" + str(i)] = np.exp(sim["log_ret_val_" + str(i)])
    new_col = new_col + ["ret_val_" + str(i)]
sim["prob"] = 1
for i in range(TIME_STEPS):
    sim["prob"] = sim["prob"]* sim["q_prob_" + str(i)]
for i in range(TIME_STEPS):
    sim["b_crossed_" + str(i)] = sim["log_ret_val_" + str(i)] < round_factor(b_norm)

sim["bin_1_activate"] = sim["log_ret_val_0"] > round_factor(K_bin_norm[0])
sim["bin_2_activate"] = sim["log_ret_val_1"] > round_factor(K_bin_norm[1])

sim["K_put"] = sim["log_ret_val_" + str(TIME_STEPS-1)] < round_factor(K_norm)

sim["payoff"] = 0.0

sim.loc[sim["bin_1_activate"],"payoff"] = bin_1_payoff
sim.loc[(~sim["bin_1_activate"]) & (sim["bin_2_activate"]),"payoff"] = bin_2_payoff

barrier_crossed_once = sim["b_crossed_0"]
for i in range(1 , TIME_STEPS):
    barrier_crossed_once = barrier_crossed_once | sim["b_crossed_" + str(i)]

put_condition = (~((sim["bin_1_activate"]) | (sim["bin_2_activate"]))) & (barrier_crossed_once) & (sim["K_put"])

sim.loc[put_condition,"payoff"] = S*(sim[put_condition]["ret_val_2"]-K_comp)*np.exp(-r*TIME_STEPS)
expected_payoff = sum(sim["prob"]*sim["payoff"])
print("expected payoff classical: " + str(expected_payoff))

Integration Method circuit synthesis

from classiq import *
from classiq.qmod.symbolic import sqrt
from classiq.execution import ExecutionPreferences


@qfunc
def integrator(exp_rate: CReal, x: QNum, ref: QNum, res: QBit) -> None:
    prepare_exponential_state(-exp_rate, ref)
    res ^= x >= ref
def affine_py(x: QNum):
    return MU*dt + SIGMA*sqrt(dt)*(x*STEP_X + MIN_X) 
@qfunc
def add_minimum(x: QArray):
    X(x[x.size-1])
round_K_bin_norm =[]
round_b_norm = round_factor(b_norm)
round_K_bin_norm.append(round_factor(K_bin_norm[0]))
round_K_bin_norm.append(round_factor(K_bin_norm[1]))
round_k_norm = round_factor(K_norm)
@qfunc
def integration_load_amplitudes(y: QNum, aux_reg: QNum, ind_reg: QBit):
    exp_rate = (1 / (2**PRECISION))
    integrator(exp_rate, y, aux_reg, ind_reg)

@qfunc
def integration_payoff(log_return: QNum, aux_reg: QNum, ind_reg: QBit):
    log_return_unsigned = QNum("log_return_unsigned", log_return.size, UNSIGNED, 0)
    bind(log_return, log_return_unsigned)
    add_minimum(log_return_unsigned)
    integration_load_amplitudes(log_return_unsigned, aux_reg, ind_reg)
    add_minimum(log_return_unsigned)
    bind(log_return_unsigned, log_return)

@qfunc
def check_barrier_crossed(log_return: QNum , barrier_crossed : QBit):
    barrier_crossed ^= log_return < round_b_norm

@qfunc
def check_binary(log_return: QNum, round_K_bin_norm: CReal, binary_valid : QBit):
    binary_valid ^= log_return > round_K_bin_norm
@qfunc
def check_K_put(log_return: QNum[PRECISION+int_places, SIGNED, PRECISION] , k_put_valid : QBit):
    k_put_valid ^= log_return < round_k_norm

@qfunc
def binary_payoff(binary_payoff: CReal, target: QBit):
    RY(binary_payoff, target)

@qfunc
def zero_payoff(target: QBit):
    RY(zero_rotation, target)

@qfunc
def check_put_activate(barriers_crossed: QArray, k_put_valid: QBit, put_alone_valid:QBit):
    put_alone_valid ^= ((
        (barriers_crossed[0] ==1)
        | (barriers_crossed[1] ==1)
        | (barriers_crossed[2]==1)
    ) & (k_put_valid==1))==1

@qfunc
def populate_variable_for_conditions_check(
        sum_log_return: QNum, 
        barriers_crossed:QArray,
        first_bin_valid: Input[QBit], second_bin_valid:Input[QBit], 
        check_all_zeros: Output[QNum[3, False, 0]]
):
    k_put_valid = QBit("k_put_valid")

    put_alone_valid = QBit("put_alone_valid")
    allocate(1, put_alone_valid)

    within_apply(
        lambda : [
            allocate(1, k_put_valid),
            check_K_put(sum_log_return, k_put_valid),
            check_barrier_crossed(sum_log_return, barriers_crossed[2]) #magari qui si può condizionare
        ],
        lambda: check_put_activate(barriers_crossed, k_put_valid, put_alone_valid)
    )

    bind([put_alone_valid, first_bin_valid, second_bin_valid], check_all_zeros)

@qfunc
def final_payoff(check_all_zeros: QNum, sum_log_return:QNum, aux_reg:QNum, ind_reg:QBit):
    control(check_all_zeros==0, lambda: zero_payoff(ind_reg))
    control(check_all_zeros==1, lambda: integration_payoff(sum_log_return, aux_reg, ind_reg))


@qfunc
def autocallable_integration(
    x: QArray[QNum, TIME_STEPS],
    aux_reg: QNum,
    ind_reg: QBit,
    sum_log_return: QNum,
    first_bin_valid: QBit,
    second_bin_valid: QBit,
    barriers_crossed: QArray[QBit]
) -> None:
    repeat(x.len, lambda i: inplace_prepare_state(probabilities, 0, x[i]))

    sum_log_return ^= affine_py(x[0])
    check_binary(sum_log_return, round_K_bin_norm[0], first_bin_valid)
    control(first_bin_valid, lambda: binary_payoff(bin_1_payoff_rotation, ind_reg))
    #check_barrier_crossed(sum_log_return, barriers_crossed[0])
    check_barrier_crossed(sum_log_return, barriers_crossed[0])

    sum_log_return += affine_py(x[1])
    check_binary(sum_log_return, round_K_bin_norm[1], second_bin_valid)
    control( ((first_bin_valid==0) & (second_bin_valid==1))==1, lambda: binary_payoff(bin_2_payoff_rotation, ind_reg)) #check order
    #check_barrier_crossed(sum_log_return, barriers_crossed[1]) #magari qui si può condizionare
    check_barrier_crossed(sum_log_return, barriers_crossed[1])

    sum_log_return += affine_py(x[2])

    check_all_zeros = QNum("check_all_zeros", 3)
    within_apply(
        lambda: populate_variable_for_conditions_check(sum_log_return, barriers_crossed, first_bin_valid, second_bin_valid, check_all_zeros),
                 lambda: final_payoff(check_all_zeros, sum_log_return, aux_reg, ind_reg)
    )

IQAE functions and QStruct

from classiq import QStruct, CInt, cfunc, power, grover_operator, Z, iqae, save


class OracleVars(QStruct):
    x: QArray[QNum[NUM_QUBITS, False, 0], TIME_STEPS]
    aux_reg: QNum[int_places+PRECISION, False, 0]
    sum_log_return: QNum[PRECISION+int_places, SIGNED, PRECISION]
    first_bin_valid: QBit
    second_bin_valid: QBit
    barriers_crossed: QArray[QBit, TIME_STEPS]

class IQAEVars(QStruct):
    state: OracleVars
    ind_reg: QBit

@qfunc
def iqae_oracle(vars: IQAEVars):
    Z(vars.ind_reg)

@qfunc
def iqae_state_preparation(iqae_vars: IQAEVars):
    autocallable_integration(
        iqae_vars.state.x,
        iqae_vars.state.aux_reg,
        iqae_vars.ind_reg,
        iqae_vars.state.sum_log_return,
        iqae_vars.state.first_bin_valid,
        iqae_vars.state.second_bin_valid,
        iqae_vars.state.barriers_crossed,
    )

@qfunc
def main(
    k: CInt,
    ind_reg: Output[QBit],
) -> None:
    iqae_vars = IQAEVars("iqae_vars")
    allocate(NUM_QUBITS*TIME_STEPS+((int_places+PRECISION)*2)+2+ TIME_STEPS+1, iqae_vars)
    iqae_state_preparation(iqae_vars)
    power(
        k,
        lambda: grover_operator(
            oracle=iqae_oracle,
            space_transform=iqae_state_preparation,
            packed_vars=iqae_vars,
        )
    )
    state_reg = OracleVars("state_reg")
    bind(iqae_vars, [state_reg, ind_reg])

@cfunc
def cmain():
    iqae_res = iqae(epsilon=0.001, alpha=0.002)
    save({"iqae_res": iqae_res})

Base simulator sythesis

from time import time
start_time = time()
constraints = Constraints(optimization_parameter="width")
qmod = create_model(
    main, 
    constraints=constraints, 
    classical_execution_function=cmain, 
    preferences=Preferences(machine_precision=PRECISION, timeout_seconds =2000 ),
    execution_preferences = ExecutionPreferences(shots=100000)
)
print("Starting synthesis")
qprog = synthesize(qmod)
show(qprog)
from classiq import QuantumProgram
circuit = QuantumProgram.from_qprog(qprog)
print("Circuit width: ", circuit.data.width)

Execution takes a lot of time

See the results below

# job = execute(qprog)
# res_iqae= job.result_value()
# print("the expected result from classical computation is: " + str(expected_payoff))
# print("the result from IQAE is: " + str(postprocessing(res_iqae.estimation)))
# print("the confidence interval of the quantum estimation is: " + str(postprocessing(res_iqae.confidence_interval[0]))+"," +str(postprocessing(res_iqae.confidence_interval[1])) )
# print(f"Synthesis and execution time: {time() - start_time} seconds")
print("the expected result from classical computation is: " + str(expected_payoff))
print("the result from IQAE is: " + str(-3.5079217726515903))
print("the confidence interval of the quantum estimation is: " + str(postprocessing(0.119158741))+"," +str(postprocessing(0.1197666)))