파이썬

파이썬_주식데이터를 활용한 회귀 신경망 스터디 4. 회귀 신경망 응용편

크롬볼쯔 2018. 12. 15. 13:16

회귀 신경망 스터디의 마지막 글입니다.

앞에서 배운 크롤링 + 회귀 신경망을 활용하여 

주식 데이터를 가져와서 예측해보는 실습을 하도록하겠습니다.

이전 글에서 작성한 코드들을 활용하여 진행하도록 하겠습니다.




1. 주식 데이터를 크롤링하고 분서에 적합한 형태로 변환하기


파이썬_주식데이터를 활용한 회귀 신경망 스터디 2.회귀분석실습(http://cromboltz.tistory.com/22?category=626018)


위의 글에서 3.3까지 진행한 결과를 활용하여 진행하도록 하겠습니다.

제대로 따라오셨다면 데이터는 이렇게 되어 있을 것입니다.





x(입력 데이터)와 y(예측 데이터)를 결정해줘야합니다.

이전 글을 보니 변동폭으로 거래량을 예측했었군요.

이번에도 똑같이 진행하도록 하겠습니다.



정규화하기

입력 데이터와 예측 데이터를 정규화해주어야 합니다. 

정규화란 두 데이터의 스케일을 동일하게 맞춘다고 생각하시면 될 것 같습니다.

여러가지 방법이 있지만 여기에서는 각각의 값들이 최대값과 비교하여 어느 정도의 크기를 가지는지로 변환하도록 하겠습니다.

#정규화하기

data_x = stock_price_all_df["변동폭"].astype(float)/max(stock_price_all_df["변동폭"].astype(float))

data_y = stock_price_all_df["거래량"].astype(float)/max(stock_price_all_df["거래량"].astype(float))

어떻게 바뀌었는지 확인해볼까요?



최대값이 1, 최소값이 0을 갖는 값으로 변환이 되었음을 확인할 수 있습니다.



행렬 형태로 변환하기


회귀 신경망에 넣기 위해서는 행렬 형태로 변환을 해줘야 합니다. 방법은 다음과 같습니다.

#행렬 형태로 변환하기

data_x = np.array(data_x)[:, np.newaxis] data_y = np.array(data_y)[:, np.newaxis]

결과는 다음과 같습니다.


상수 넣기

상수는 대단히 유용한 정보입니다. x의 중요도를 감소시키고 모델의 파워를 증가시키는 방법 중 하나 입니다.

꼭 포함하시길 바랍니다.

#상수 넣기

data_x = np.hstack((np.ones_like(data_x), data_x))

결과는 다음과 같습니다.


데이터 셔플 및 훈련 테스트 셋으로 나누기

데이터의 순서 효과를 없애기 위해 셔플한 후, 훈련8 테스트2의 비율로 나눕니다.

#데이터 셔플하기

order = np.random.permutation(len(data_x)) portion = int(len(data_x)*(20/100)) test_x = data_x[order[:portion]] test_y = data_y[order[:portion]] train_x = data_x[order[portion:]] train_y = data_y[order[portion:]]


설정한 비율대로 훈련 데이터와 테스트 데이터가 나뉘었음을 확인할 수 있습니다.



회귀 신경망 분석 수행

회귀 신경망 분석을 수행합니다.

설정한 tolerance보다 작아질 때까지 반복하도록 설정해놓습니다.

w = np.random.randn(2) #2*1의 w 생성 alpha = 0.01 tolerance = 0.00001 # 그라디언트 하강 수행 iterations = 1 while True: # 그라디언트만큼 감소시키고 추정파라미터 갱신 gradient, error = get_gradient(w, train_x, train_y) new_w = w - alpha * gradient # 이전 예측 모델과 새로운 예측 모델의 차이가 매우 작으면 수렴으로 가정 if np.sum(abs(new_w - w)) < tolerance: print("수렴") break # 반복 250회마다 오차와 현재 그래프 출력 if iterations % 2500 == 0: plt.plot(data_x[:, 1], data_x.dot(w), c='g', label='Model') plt.scatter(train_x[:, 1], train_y, c='b', label='Train Set') plt.scatter(test_x[:, 1], test_y, c='r', label='test Set') plt.grid() plt.legend(loc='best') plt.xlabel('X') plt.ylabel('Y') plt.show() print('Iteration: %d - Error: %.4f' % (iterations, error)) iterations += 1 w = new_w


약 2,500 반복했을 때마다 그래프와 에러율을 프린트해보았습니다.


5,000번 정도 반복했을 때까지는 학습 데이터와도 모델이 잘 맞지 않는 모습을 보이다가

학습을 반복할수록 점점 모델이 그린 선과 학습 + 테스트 데이터의 분포도가 유사해짐을 확인할 수 있습니다.(나이스)





튜닝하여 다른 곳에 응용할 수도 있도록 전체 코드도 함께 올려놓도록 하겠습니다.


#라이브러리 호출 import pandas as pd import statsmodels.api as sm import numpy as np from pandas import Series, DataFrame import requests from bs4 import BeautifulSoup import matplotlib.pyplot as plt %matplotlib inline



#주식데이터 크롤링

corp_code = '005930' for page in range(1, 13): url_tmp = 'http://finance.naver.com/item/sise_day.nhn?code=%s&page=%s' url = url_tmp % (corp_code, page) html = requests.get(url).text html = html.replace('\n', '') html = html.replace('\t', '') html = html.replace(',', '') html = html.replace('.', '-') soup = BeautifulSoup(html, 'lxml') span_data = soup.find_all('span') td_data = soup.find_all('td') tr_data = soup.find_all('tr') th_data = soup.find_all('th') if page == 1: columns_list = [] for i in range(len(th_data)): i_text = th_data[i].text columns_list.append(i_text) stock_price_all_array = np.array([columns_list]) rest_page = 0 stock_price_list = [] for i in range(1, len(span_data)): a = span_data[i].text stock_price_list.append(a) if page > 1 and stock_price_list[0] == stock_price_all_array[-10,0]: rest_page = (30 - page)*10 stock_price_array = np.zeros((rest_page,7)) stock_price_all_array = np.append(stock_price_all_array, stock_price_array, axis=0) break if i % 7 == 0: stock_price_array = np.array([stock_price_list]) stock_price_all_array = np.append(stock_price_all_array, stock_price_array, axis=0) stock_price_list = [] if rest_page: break elif len(span_data) < 70: rest_row = int((70 - len(span_data))/7+1) stock_price_array = np.zeros((rest_row, 7)) stock_price_all_array = np.append(stock_price_all_array, stock_price_array, axis=0) rest_page = (13 - page-1)*10 stock_price_array = np.zeros((rest_page,7)) stock_price_all_array = np.append(stock_price_all_array, stock_price_array, axis=0) break stock_price_all_array = np.delete(stock_price_all_array, 0, 0) stock_price_all_df = DataFrame(stock_price_all_array, columns=columns_list).set_index('날짜') stock_price_all_df = stock_price_all_df.reset_index()



#금일 데이터 제거(아웃라이어)

stock_price_all_df=stock_price_all_df[1:].reset_index(drop=True)


#날짜를 제외한 다른 데이터를 수치(int or float) 타입으로 변환하기

stock_price_all_df.iloc[:, 1:6]= stock_price_all_df.iloc[:, 1:6].astype(float)


#변동폭 계산하기

stock_price_all_df["변동폭"] = stock_price_all_df["고가"] - stock_price_all_df["저가"]


#정규화하기 data_x = stock_price_all_df["변동폭"].astype(float)/max(stock_price_all_df["변동폭"].astype(float)) data_y = stock_price_all_df["거래량"].astype(float)/max(stock_price_all_df["거래량"].astype(float))


#행렬타입으로 변환하기

data_x = np.array(data_x)[:, np.newaxis] data_y = np.array(data_y)[:, np.newaxis]


#상수넣기

data_x = np.hstack((np.ones_like(data_x), data_x))


#데이터 셔플하고 훈련 테스트 데이터로 나누기 order = np.random.permutation(len(data_x)) portion = int(len(data_x)*(20/100)) test_x = data_x[order[:portion]] test_y = data_y[order[:portion]] train_x = data_x[order[portion:]] train_y = data_y[order[portion:]]



#기울기 및 에러 계산하는 함수 정의

def get_gradient(w, x, y): y_estimate = x.dot(w) error = (y.flatten() - y_estimate) mse = (1.0 / len(x)) * np.sum(np.power(error, 2)) gradient = -(1.0 / len(x)) * error.dot(x) return gradient, mse




#회귀 신경망 수행

w = np.random.randn(2) #임의의 가중치 생성 alpha = 0.01 # 학습률 tolerance = 0.00001 #기준 에러 차이 설정

# 그라디언트 하강 수행 iterations = 1 while True: # 그라디언트만큼 감소시키고 추정파라미터 갱신 gradient, error = get_gradient(w, train_x, train_y) new_w = w - alpha * gradient # 이전 예측 모델과 새로운 예측 모델의 차이가 매우 작으면 수렴으로 가정 if np.sum(abs(new_w - w)) < tolerance: print("수렴") break # 반복 250회마다 오차와 현재 그래프 출력 if iterations % 2500 == 0: plt.plot(data_x[:, 1], data_x.dot(w), c='g', label='Model') plt.scatter(train_x[:, 1], train_y, c='b', label='Train Set') plt.scatter(test_x[:, 1], test_y, c='r', label='test Set') plt.grid() plt.legend(loc='best') plt.xlabel('X') plt.ylabel('Y') plt.show() print('Iteration: %d - Error: %.4f' % (iterations, error)) iterations += 1 w = new_w