chainerのディープラーニング(多層パーセプトロン?)をscikit-learn風味に

おひさです

お久しぶりです。
忙しすぎて「ブログに勉強したことまとめっぞ!」と思っていたのに三日坊主になりかけてました(笑)。
最近研究が軌道に乗ってきたようなそうでもないような…
リア充ではないけどラボ充です( ̄▽ ̄)

はやりのディープラーニングをscikit-learn風にしちゃいました

はい。しちゃいました。

僕の研究内容はハイパーパラメータの推定手法で、ディープラーニングは専門ではないのですが、ご存知ディープラーニングはハイパーパラメータがアホみたいにたくさんあるので、なにかとハイパーパラメータ推定の実験材料になったり、今しているアルバイトでディープラーニングによる分類モデルを作成したりとなにかと作る機会があって、「これいちいち作るのめんどいなあ」と思っていました。

それならばもうハイパーパラメータを調整する部分をある程度一か所にまとめておいて、scikit-learnっぽく使えるようにしておこうと思い下のプログラムを作成しました。(ほんとはなんか別のクラスを作成することでそんなことができるらしいのですが、まあモデル作成の手間を省きたかったので)
使用例はメイン関数内に書いてあるのでわかると思います。
ほんとうはモデル調整部分を一か所にまとめておきたかったのですが…思うことあって__init__とforwardとfitのしょっぱなのself.modelの部分に分散してあります。
ここの部分を書き換えれば、基本的なニューラルネットならどんな形でも対応可能なんじゃないかなあ。
もっとうまい書き方ありそうですが悪しからず!


なお、本プログラムの作成にあたりこのブログを全面的に参考にさせていただきました。
「あんまりディープラーニング知らない…」みたいな人もこれ見ればすぐわかるんじゃないかなあってくらい親切すぎる解説でとても参考になりました。
qiita.com


試していただけるとわかりますが精度が40%程度とクズです(笑)
調整がめんどいので←
使用する場合はハイパーパラメータの調整は適宜問題にあわせてやってください。
間違いや「こうした方がいいぞ!」などございましたら教えていただけると幸いです。
(*^-^*)

# -*- coding: utf-8 -*-
import numpy as np
import matplotlib.pyplot as plt
from chainer import Variable,FunctionSet,optimizers
import chainer.functions as F
import chainer.links as L
from sklearn.datasets import fetch_mldata
from sklearn.cross_validation import train_test_split

class DeepLearning:
	def __init__(self):
		# バッチサイズ
		self.batchsize = 1000
		# エポック数
		self.n_epoch = 10
		# 最適化手法
		self.optimizer = optimizers.Adam()

	def forward(self, x_data, y_data=None, train=True):
		x = Variable(x_data)
		b1 = self.model.bn1(x)
		h1 = F.dropout(F.relu(self.model.l1(b1)), ratio=0.2, train=train)
		b2 = self.model.bn2(h1)
		h2 = F.dropout(F.relu(self.model.l2(b2)), ratio=0.2, train=train)
		b3 = self.model.bn3(h2)
		y = F.dropout(F.relu(self.model.l3(b3)), ratio=0.2, train=train)

		if train:
			t = Variable(y_data)
			return F.softmax_cross_entropy(y,t), F.accuracy(y,t), y.data

		else:
			return y.data
		
	def fit(self, x_train, y_train):

		self.number_of_label=np.unique(y_train).size
		self.model = FunctionSet(
			bn1 = L.BatchNormalization(x_train.shape[1]),
			l1 = F.Linear(x_train.shape[1], 1000),
			bn2 = L.BatchNormalization(1000),
			l2 = F.Linear(1000, 1000),
			bn3 = L.BatchNormalization(1000),
			l3 = F.Linear(1000, self.number_of_label)
			)

		# トレーニングデータの数
		N = y_train.size
		# optimizer セットアップ
		self.optimizer.setup(self.model.collect_parameters())

		# 念のためトレーニングデータの型合わせ
		x_train = x_train.astype(np.float32)
		y_train = y_train.astype(np.int32)

		# Learning loop
		for epoch in range(1,self.n_epoch+1):
			# エポック数表示
			print 'epoch{}'.format(epoch)

			# training
			# N個の順番をランダムに並び替える
			perm = np.random.permutation(N)

			# ロスと正答率初期化
			sum_accuracy = 0
			sum_loss = 0
			# 0〜Nまでのデータをバッチサイズごとに使って学習
			for i in range(0, N, self.batchsize):
				if i + self.batchsize < N:
					x_batch = x_train[perm[i:i+self.batchsize]]
					y_batch = y_train[perm[i:i+self.batchsize]]

				else:
					x_batch = x_train[perm[i:]]
					y_batch = y_train[perm[i:]]

				# 勾配を初期化
				self.optimizer.zero_grads()
				# 順伝播させて誤差と精度を算出
				loss, acc, answer = self.forward(x_batch, y_batch)
				# 誤差逆伝播で勾配を計算
				loss.backward()
				self.optimizer.update()

				sum_loss     += float(loss.data) * x_batch.shape[0]
				sum_accuracy += float(acc.data) * x_batch.shape[0]

			print 'train mean loss={}, accuracy={}'.format(sum_loss / N, sum_accuracy / N)

	def predict(self, x_test):
		N_test=x_test.shape[0]
		# ロスと正答率初期化
		sum_accuracy = 0
		sum_loss = 0
		answers = np.array([])
		x_test = x_test.astype(np.float32)
		for i in range(0, N_test, self.batchsize):
			if i + self.batchsize < N_test:
				x_batch = x_test[i:i+self.batchsize]

			else:
				x_batch = x_test[i:]
			
			# 順伝播させて出力
			sub_answer = self.forward(x_batch, train=False)
			# 出力最大となるラベルを予測値とする
			sub_answer = np.argmax(sub_answer, axis=1)
			answers = np.append(answers, sub_answer)

		answers=answers.reshape(-1,1)

		return answers

	def predict_proba(self, x_test):
		N_test=x_test.shape[0]
		# ロスと正答率初期化
		sum_accuracy = 0
		sum_loss = 0
		answers = np.zeros((x_test.shape[0], self.number_of_label))
		x_test = x_test.astype(np.float32)
		for i in range(0, N_test, self.batchsize):
			if i + self.batchsize < N_test:
				x_batch = x_test[i:i+self.batchsize]
				answers[i:i+self.batchsize] = self.forward(x_batch, train=False)

			else:
				x_batch = x_test[i:]
				answers[i:] = self.forward(x_batch, train=False)
		
		probability=np.exp(answers)/np.sum(np.exp(answers), axis=1).reshape(-1,1)
		return probability

	def score(self, x_test, y_test):
		predict_y = self.predict(x_test).reshape(-1,)
		wrong_size = np.count_nonzero(predict_y - y_test)
		score=float(y_test.size - wrong_size)/y_test.size
		return score

if __name__=='__main__':
	# 使用例

	# 前処理
	mnist=fetch_mldata('MNIST original',data_home=".")
	data=mnist.data
	data/=data.max()
	target=mnist.target
	# データを訓練用とテスト用に分割
	x_train, x_test, y_train, y_test=train_test_split(data, target, test_size=0.2)

	# 分類器定義
	DL = DeepLearning()
	# トレーニング
	DL.fit(x_train, y_train)

	# 予測
	predict_answer = DL.predict(x_test)
	print predict_answer
	# 各ラベルをとる確率の計算
	probability = DL.predict_proba(x_test)
	print probability
	# テストデータでの正答率
	score = DL.score(x_test, y_test)
	print score