Autocallables with Integration Amplitude Loading
Data Definitions
import numpy as np
import scipy
S = 18 # notional value (can be asset initial price)
dt = 1 #in year
NUM_MARKET_DAYS_IN_YEAR = 250
DAILY_SIGMA = 0.0150694 #daily std log return
DAILY_MU = 0.00050963 #daily mean log return
SIGMA = DAILY_SIGMA*np.sqrt(NUM_MARKET_DAYS_IN_YEAR) #annual
MU = DAILY_MU*NUM_MARKET_DAYS_IN_YEAR #annual
b = 0.7 # binary barrier (to check with returns)
K_comp = 1 # K put (to check with returns)
K = K_comp*S # K put (to to use for the payoff)
r= 0.04 #annual risk free rate
K_bin_1 = 1.1 # K binaries (to check with returns)
K_bin_2 = 1.1 # K binaries (to check with returns)
K_bin_norm = [np.log(K_bin_1), np.log(K_bin_2)] # K binaries (to check with log returns)
bin_1_payoff = 2*np.exp(-r*1) # payoff binaries already discounted
bin_2_payoff = 3*np.exp(-r*2) # payoff binaries already discounted
NUM_QUBITS = 2
TIME_STEPS = 3 # 3 years
PRECISION = 2
NUM_QUBITS*TIME_STEPS
Gaussian State preparation
def gaussian_discretization(num_qubits, mu=0, sigma=1, stds_around_mean_to_include=3):
lower = mu - stds_around_mean_to_include * sigma
upper = mu + stds_around_mean_to_include * sigma
num_of_bins = 2**num_qubits
sample_points = np.linspace(lower, upper, num_of_bins + 1)
def single_gaussian(x: np.ndarray, _mu: float, _sigma: float) -> np.ndarray:
cdf = scipy.stats.norm.cdf(x, loc=_mu, scale=_sigma)
return cdf[1:] - cdf[0:-1]
non_normalized_pmf = (single_gaussian(sample_points, mu, sigma),)
real_probs = non_normalized_pmf / np.sum(non_normalized_pmf)
return sample_points[:-1], real_probs[0].tolist()
grid_points, probabilities = gaussian_discretization(NUM_QUBITS, stds_around_mean_to_include=3)
STEP_X = grid_points[1] - grid_points[0]
MIN_X = grid_points[0]
Rescalings
Compute \(R_T^{max}\) resulting from discretization.
R_T_MAX_PROP = 0
if MU>0 and SIGMA>0:
R_T_MAX_PROP = TIME_STEPS*(MU*dt + SIGMA*np.sqrt(dt)*(grid_points[-1]))
R_T_MIN_PROP = 0
if MU>0 and SIGMA>0:
R_T_MIN_PROP = TIME_STEPS*(MU*dt + SIGMA*np.sqrt(dt)*(grid_points[0]))
R_T_MAX_PROP = np.max([np.abs(R_T_MIN_PROP), np.abs(R_T_MAX_PROP)])
R_T_MAX = np.log(np.max([(np.exp(TIME_STEPS*r)*bin_2_payoff)+K, (np.exp(TIME_STEPS*r)*bin_1_payoff)+K, K])/S)
R_T_MAX = max(R_T_MAX_PROP, R_T_MAX)
In two's complement, given \(N\) as number of qubits, we can represent from \(-2^{N-1}\) and \(2^{N-1}-1\).
a = 1/(2**PRECISION)
if R_T_MAX < 1:
int_places = 1
else:
int_places = np.ceil(np.log2(np.ceil(R_T_MAX))) + 1
num_norm_factor = ((np.exp(a*(2**(int_places+PRECISION)))-1))
den_norm_factor = (np.exp(a*(2**(int_places+PRECISION-1)+1)))
norm_factor = S*(num_norm_factor/den_norm_factor)
Compute constant rotations
def compute_constant_rotation(const_payoff):
b1 = const_payoff*np.exp(r*TIME_STEPS)
num1 = (b1 + K)*den_norm_factor
den1 = S*num_norm_factor
den2 = num_norm_factor
num2 = 1
angle = (num1/den1) - (num2/den2)
return 2*np.arcsin(np.sqrt(angle))
bin_1_payoff_rotation= compute_constant_rotation(bin_1_payoff)
bin_2_payoff_rotation= compute_constant_rotation(bin_2_payoff)
zero_rotation = compute_constant_rotation(0)
bit_T_normalize = (S*np.exp(-r*TIME_STEPS))/den_norm_factor
def postprocessing(x):
return (x*norm_factor*np.exp(-r*TIME_STEPS)) + bit_T_normalize - (K*np.exp(-r*TIME_STEPS))
Verifications
bin_1_payoff
postprocessing((np.sin((compute_constant_rotation(bin_1_payoff)/2)))**2)
bin_2_payoff
postprocessing((np.sin((compute_constant_rotation(bin_2_payoff)/2)))**2)
postprocessing((np.sin((compute_constant_rotation(0)/2)))**2)
Compute constants for comparisons
b_norm = np.log(b)
K_norm = np.log(K_comp)
Classical Payoff
import pandas as pd
from itertools import product
simulation = pd.DataFrame.from_dict({"quantum_samples": list(range(len(grid_points))) ,"classical_samples": grid_points, "probabilities": probabilities} )
binary_combinations = list(product( range(2**NUM_QUBITS), repeat=TIME_STEPS))
sim = pd.DataFrame(binary_combinations)
new_col = ["time_" +str(i) for i in range(TIME_STEPS)]
sim.columns = new_col
def round_down(a):
precision_factor = 2 ** (PRECISION)
return np.floor(a * precision_factor) / precision_factor
def round_factor(a):
#precision_factor = 2 ** (PRECISION)
#return np.round(a * precision_factor) / precision_factor
return floor_factor(a)
def floor_factor(a):
precision_factor = 2 ** (PRECISION)
if a >= 0:
return np.floor(a * precision_factor) / precision_factor
else:
return np.ceil(a * precision_factor) / precision_factor
for i in range(TIME_STEPS):
sim = sim.merge(simulation, left_on="time_" + str(i) , right_on="quantum_samples")
sim = sim.drop("quantum_samples", axis=1)
new_col = new_col + ["c_" + str(i), "q_prob_" + str(i)]
sim.columns = new_col
sim["log_ret_val_" + str(i)] = round_factor(MU*dt + np.sqrt(dt)*SIGMA*MIN_X) + round_factor(SIGMA*np.sqrt(dt)*STEP_X) * sim["time_" + str(i)]
new_col = new_col + ["log_ret_val_" + str(i)]
sim.columns = new_col
if i!=0:
sim["log_ret_val_" + str(i)] = sim["log_ret_val_" + str(i)]+sim["log_ret_val_" + str(i-1)]
sim["ret_val_" + str(i)] = np.exp(sim["log_ret_val_" + str(i)])
new_col = new_col + ["ret_val_" + str(i)]
sim["prob"] = 1
for i in range(TIME_STEPS):
sim["prob"] = sim["prob"]* sim["q_prob_" + str(i)]
for i in range(TIME_STEPS):
sim["b_crossed_" + str(i)] = sim["log_ret_val_" + str(i)] < round_factor(b_norm)
sim["bin_1_activate"] = sim["log_ret_val_0"] > round_factor(K_bin_norm[0])
sim["bin_2_activate"] = sim["log_ret_val_1"] > round_factor(K_bin_norm[1])
sim["K_put"] = sim["log_ret_val_" + str(TIME_STEPS-1)] < round_factor(K_norm)
sim["payoff"] = 0.0
sim.loc[sim["bin_1_activate"],"payoff"] = bin_1_payoff
sim.loc[(~sim["bin_1_activate"]) & (sim["bin_2_activate"]),"payoff"] = bin_2_payoff
barrier_crossed_once = sim["b_crossed_0"]
for i in range(1 , TIME_STEPS):
barrier_crossed_once = barrier_crossed_once | sim["b_crossed_" + str(i)]
put_condition = (~((sim["bin_1_activate"]) | (sim["bin_2_activate"]))) & (barrier_crossed_once) & (sim["K_put"])
sim.loc[put_condition,"payoff"] = S*(sim[put_condition]["ret_val_2"]-K_comp)*np.exp(-r*TIME_STEPS)
expected_payoff = sum(sim["prob"]*sim["payoff"])
print("expected payoff classical: " + str(expected_payoff))
Integration Method circuit synthesis
from classiq import *
from classiq.qmod.symbolic import sqrt
from classiq.execution import ExecutionPreferences
@qfunc
def integrator(exp_rate: CReal, x: QNum, ref: QNum, res: QBit) -> None:
prepare_exponential_state(-exp_rate, ref)
res ^= x >= ref
def affine_py(x: QNum):
return MU*dt + SIGMA*sqrt(dt)*(x*STEP_X + MIN_X)
@qfunc
def add_minimum(x: QArray):
X(x[x.size-1])
round_K_bin_norm =[]
round_b_norm = round_factor(b_norm)
round_K_bin_norm.append(round_factor(K_bin_norm[0]))
round_K_bin_norm.append(round_factor(K_bin_norm[1]))
round_k_norm = round_factor(K_norm)
@qfunc
def integration_load_amplitudes(y: QNum, aux_reg: QNum, ind_reg: QBit):
exp_rate = (1 / (2**PRECISION))
integrator(exp_rate, y, aux_reg, ind_reg)
@qfunc
def integration_payoff(log_return: QNum, aux_reg: QNum, ind_reg: QBit):
log_return_unsigned = QNum("log_return_unsigned", log_return.size, UNSIGNED, 0)
bind(log_return, log_return_unsigned)
add_minimum(log_return_unsigned)
integration_load_amplitudes(log_return_unsigned, aux_reg, ind_reg)
add_minimum(log_return_unsigned)
bind(log_return_unsigned, log_return)
@qfunc
def check_barrier_crossed(log_return: QNum , barrier_crossed : QBit):
barrier_crossed ^= log_return < round_b_norm
@qfunc
def check_binary(log_return: QNum, round_K_bin_norm: CReal, binary_valid : QBit):
binary_valid ^= log_return > round_K_bin_norm
@qfunc
def check_K_put(log_return: QNum[PRECISION+int_places, SIGNED, PRECISION] , k_put_valid : QBit):
k_put_valid ^= log_return < round_k_norm
@qfunc
def binary_payoff(binary_payoff: CReal, target: QBit):
RY(binary_payoff, target)
@qfunc
def zero_payoff(target: QBit):
RY(zero_rotation, target)
@qfunc
def check_put_activate(barriers_crossed: QArray, k_put_valid: QBit, put_alone_valid:QBit):
put_alone_valid ^= ((
(barriers_crossed[0] ==1)
| (barriers_crossed[1] ==1)
| (barriers_crossed[2]==1)
) & (k_put_valid==1))==1
@qfunc
def populate_variable_for_conditions_check(
sum_log_return: QNum,
barriers_crossed:QArray,
first_bin_valid: Input[QBit], second_bin_valid:Input[QBit],
check_all_zeros: Output[QNum[3, False, 0]]
):
k_put_valid = QBit("k_put_valid")
put_alone_valid = QBit("put_alone_valid")
allocate(1, put_alone_valid)
within_apply(
lambda : [
allocate(1, k_put_valid),
check_K_put(sum_log_return, k_put_valid),
check_barrier_crossed(sum_log_return, barriers_crossed[2]) #magari qui si può condizionare
],
lambda: check_put_activate(barriers_crossed, k_put_valid, put_alone_valid)
)
bind([put_alone_valid, first_bin_valid, second_bin_valid], check_all_zeros)
@qfunc
def final_payoff(check_all_zeros: QNum, sum_log_return:QNum, aux_reg:QNum, ind_reg:QBit):
control(check_all_zeros==0, lambda: zero_payoff(ind_reg))
control(check_all_zeros==1, lambda: integration_payoff(sum_log_return, aux_reg, ind_reg))
@qfunc
def autocallable_integration(
x: QArray[QNum, TIME_STEPS],
aux_reg: QNum,
ind_reg: QBit,
sum_log_return: QNum,
first_bin_valid: QBit,
second_bin_valid: QBit,
barriers_crossed: QArray[QBit]
) -> None:
repeat(x.len, lambda i: inplace_prepare_state(probabilities, 0, x[i]))
sum_log_return ^= affine_py(x[0])
check_binary(sum_log_return, round_K_bin_norm[0], first_bin_valid)
control(first_bin_valid, lambda: binary_payoff(bin_1_payoff_rotation, ind_reg))
#check_barrier_crossed(sum_log_return, barriers_crossed[0])
check_barrier_crossed(sum_log_return, barriers_crossed[0])
sum_log_return += affine_py(x[1])
check_binary(sum_log_return, round_K_bin_norm[1], second_bin_valid)
control( ((first_bin_valid==0) & (second_bin_valid==1))==1, lambda: binary_payoff(bin_2_payoff_rotation, ind_reg)) #check order
#check_barrier_crossed(sum_log_return, barriers_crossed[1]) #magari qui si può condizionare
check_barrier_crossed(sum_log_return, barriers_crossed[1])
sum_log_return += affine_py(x[2])
check_all_zeros = QNum("check_all_zeros", 3)
within_apply(
lambda: populate_variable_for_conditions_check(sum_log_return, barriers_crossed, first_bin_valid, second_bin_valid, check_all_zeros),
lambda: final_payoff(check_all_zeros, sum_log_return, aux_reg, ind_reg)
)
IQAE functions and QStruct
from classiq import QStruct, CInt, cfunc, power, grover_operator, Z, iqae, save
class OracleVars(QStruct):
x: QArray[QNum[NUM_QUBITS, False, 0], TIME_STEPS]
aux_reg: QNum[int_places+PRECISION, False, 0]
sum_log_return: QNum[PRECISION+int_places, SIGNED, PRECISION]
first_bin_valid: QBit
second_bin_valid: QBit
barriers_crossed: QArray[QBit, TIME_STEPS]
class IQAEVars(QStruct):
state: OracleVars
ind_reg: QBit
@qfunc
def iqae_oracle(vars: IQAEVars):
Z(vars.ind_reg)
@qfunc
def iqae_state_preparation(iqae_vars: IQAEVars):
autocallable_integration(
iqae_vars.state.x,
iqae_vars.state.aux_reg,
iqae_vars.ind_reg,
iqae_vars.state.sum_log_return,
iqae_vars.state.first_bin_valid,
iqae_vars.state.second_bin_valid,
iqae_vars.state.barriers_crossed,
)
@qfunc
def main(
k: CInt,
ind_reg: Output[QBit],
) -> None:
iqae_vars = IQAEVars("iqae_vars")
allocate(NUM_QUBITS*TIME_STEPS+((int_places+PRECISION)*2)+2+ TIME_STEPS+1, iqae_vars)
iqae_state_preparation(iqae_vars)
power(
k,
lambda: grover_operator(
oracle=iqae_oracle,
space_transform=iqae_state_preparation,
packed_vars=iqae_vars,
)
)
state_reg = OracleVars("state_reg")
bind(iqae_vars, [state_reg, ind_reg])
@cfunc
def cmain():
iqae_res = iqae(epsilon=0.001, alpha=0.002)
save({"iqae_res": iqae_res})
Base simulator sythesis
from time import time
start_time = time()
constraints = Constraints(optimization_parameter="width")
qmod = create_model(
main,
constraints=constraints,
classical_execution_function=cmain,
preferences=Preferences(machine_precision=PRECISION, timeout_seconds =2000 ),
execution_preferences = ExecutionPreferences(shots=100000)
)
print("Starting synthesis")
qprog = synthesize(qmod)
show(qprog)
from classiq import QuantumProgram
circuit = QuantumProgram.from_qprog(qprog)
print("Circuit width: ", circuit.data.width)
Execution takes a lot of time
See the results below
# job = execute(qprog)
# res_iqae= job.result_value()
# print("the expected result from classical computation is: " + str(expected_payoff))
# print("the result from IQAE is: " + str(postprocessing(res_iqae.estimation)))
# print("the confidence interval of the quantum estimation is: " + str(postprocessing(res_iqae.confidence_interval[0]))+"," +str(postprocessing(res_iqae.confidence_interval[1])) )
# print(f"Synthesis and execution time: {time() - start_time} seconds")
print("the expected result from classical computation is: " + str(expected_payoff))
print("the result from IQAE is: " + str(-3.5079217726515903))
print("the confidence interval of the quantum estimation is: " + str(postprocessing(0.119158741))+"," +str(postprocessing(0.1197666)))