1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
| import pandas as pd import qstock as qs import statsmodels.api as sm from scipy.stats import ttest_1samp
def main(): # 获取并整理数据 stock = ['京沪高铁','上证指数'] merged_data = get_Data(stock) print(merged_data) all_results = pd.DataFrame() t_test_res = pd.DataFrame() event_dates = ['2022-07-23', '2023-07-22', '2024-06-24'] for event_date in event_dates: # 获取估计期及窗口期数据 estimation_data, event_data = extract_event_data(event_date, merged_data) # 正常收益模型 alpha, beta = calculate_regression_coefficients(estimation_data) # 异常收益率及累积异常收益率 car_result = calculate_CAR(event_data, alpha, beta, event_date) all_results = pd.concat([all_results,car_result],ignore_index=True) # 显著性检验 t_test = single_sample_test(car_result, event_date) t_test_res = pd.concat([t_test_res,t_test],ignore_index=True) # 将结果写入excel all_results.to_excel('异常收益率&累积异常收益率.xlsx',index=False) t_test_res.to_excel('显著性检验.xlsx',index=False) # 1.利用qstock包获取数据下载到excel
def get_Data(stock): data = qs.get_data(stock, start='20210101', end='20240930', freq='d', fqt=1) data = data.reset_index()
# 1. 下载并整理数据:计算每日收益率 data['daily_return'] = data['close'].pct_change() # 计算每日收益率 stock_data = data[data['name'] == stock[0]][['date', 'daily_return']].rename(columns={'daily_return': 'stock_return'}) index_data = data[data['name'] == stock[1]][['date', 'daily_return']].rename(columns={'daily_return': 'index_return'})
# 合并数据并清理空值 merged_data = pd.merge(stock_data, index_data, on='date').dropna()
return merged_data
# 2. 提取建模数据和事件窗口数据 def extract_event_data(event_date, data_frame, estimation_window=120,event_window=5):
try: event_date = pd.to_datetime(event_date) data_frame = data_frame.reset_index(drop=True) # 筛选出早于公告日的数据,找到最接近公告日的日期或公告日(包括当天) filtered_data = data_frame[data_frame['date'] <= event_date] event_idx = (filtered_data['date'] - event_date).abs().idxmin() # 如果目标日期是交易日,即目标日期在日期中 # 检查公告日是不是交易日,是的话为true,即数字1,不是的话为0 is_current_day = event_date not in filtered_data['date'].values # 计算估计期和事件窗口范围 start_idx = max(0, event_idx - estimation_window - event_window + int(is_current_day)) estimation_end_idx = event_idx - event_window + int(is_current_day)
# 估计期数据 estimation_data = data_frame.iloc[start_idx:estimation_end_idx] # 事件窗口数据【考虑事件日不是交易日】 event_window_data = data_frame.iloc[event_idx - event_window + int(is_current_day) : event_idx + event_window + 1]
return estimation_data, event_window_data except IndexError: print(f"警告:事件日期 {event_date} 超出数据范围或数据不足。") return None, None
# 3. 构建OLS回归模型,计算回归参数 def calculate_regression_coefficients(estimation_data): """ 使用估计期数据进行OLS回归,计算回归参数。 """ X = sm.add_constant(estimation_data['index_return']) # 添加常数项 y = estimation_data['stock_return'] model = sm.OLS(y, X).fit()
return model.params['const'], model.params['index_return']
# 4. 计算累积异常收益率(CAR) def calculate_CAR(event_data, alpha, beta, event_date): """ 使用事件窗口数据和回归参数计算累积异常收益率(CAR)。 """ event_data = event_data.copy() # 避免修改原始数据 event_data['predicted_return'] = event_data['index_return'] * beta + alpha # 预测收益率 event_data['abnormal_return'] = event_data['stock_return'] - event_data['predicted_return'] # 异常收益率 event_data['cumulative_abnormal_return'] = event_data['abnormal_return'].cumsum() # 累积异常收益率 event_data['event_date'] = event_date
return event_data
# 5.累积超额收益率的单样本检验,检验其显著性是否为0 def single_sample_test(car_data, event_date): """ 对每个事件窗口的累积异常收益率(CAR)进行单样本 t 检验。 """ event_car = car_data[car_data['event_date'] == event_date]['cumulative_abnormal_return']
# 单样本 t 检验,假设均值为 0 t_stat, p_value = ttest_1samp(event_car, popmean=0) # 保存结果 res = { 'event_date': event_date, 'mean_CAR': event_car.mean(), 't_stat': t_stat, 'p_value': p_value, 'significant': p_value < 0.05 # 是否显著 }
return pd.DataFrame([res])
if __name__ == '__main__': main()
|