diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 0000000..1807e69 --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,23 @@ +FROM python:3.8-slim + +RUN apt-get update && apt-get upgrade -y +RUN apt-get install -y \ + libgl1-mesa-dev \ + libglib2.0-0 \ + libsm6 \ + libxext6 \ + libxrender-dev \ + libx11-dev + +COPY ./requirements.txt ./ + +RUN pip3 install --upgrade pip setuptools +RUN pip3 install --upgrade wheel +RUN pip3 install -r requirements.txt + +ENV DISPLAY=host.docker.internal:0.0 + +WORKDIR /code +ADD . . + +CMD python3 main.py diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..dbac373 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,35 @@ +{ + "name": "sato_container", + // Dockerfileでイメージ・コンテナを作成 + "context": "..", + "dockerFile": "Dockerfile", + // リモート先のVS Codeにインストールする拡張機能 + "extensions": [ + "ms-python.python" + ], + "settings": { + // 構文チェックを行うようにする + "python.linting.enabled": true, + // 保存時に構文チェックを行う + "python.linting.lintOnSave": true, + // 構文チェックをpylintで行わないようにする (※規定値がtrueなため) + "python.linting.pylintEnabled": false, + // 構文チェックをflake8で行うようにする + "python.linting.flake8Enabled": true, + // flake8のパス (※pyproject-flake8を導入している場合はpflake8で上書きする必要がある) + "python.linting.flake8Path": "/usr/local/bin/pflake8", + // mypyを有効化する + "python.linting.mypyEnabled": true, + // フォーマットをblackで行うようにする + "python.formatting.provider": "black", + // pythonファイルの設定 + "[python]": { + // 保存時にimport文のソートなどを行う + "editor.codeActionsOnSave": { + "source.organizeImports": true + }, + // 保存時にフォーマットを行う + "editor.formatOnSave": true + } + }, +} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..51194ad --- /dev/null +++ b/.gitignore @@ -0,0 +1,134 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + + +.vscode/ +.mypy_cache/ +.idea/ diff --git a/ConsolePlay.py b/ConsolePlay.py new file mode 100644 index 0000000..43f84c7 --- /dev/null +++ b/ConsolePlay.py @@ -0,0 +1,91 @@ +import numpy as np +from TicTacToe import TicTacToe + +PLAYER = { + "None": 0, + "First": 1, + "Second": 2 +} + +JUDGE = { + "None": "none", + "OUT_OF_RANGE": "out_of_range", + "OVERLAP": "overlap", + "WIN": "win", + "DRAW": "draw" +} + +class TTTConsole: + def __init__(self, player1s_myturn, player2s_myturn, Is_shown=False): + self.PLAYER_STR = ["", "先手", "後手"] + self.PLAYER_MARK = [" ", "○", "×"] + self.Players_myturn = [None, player1s_myturn, player2s_myturn] + + self._ttt = TicTacToe() + self._showNumber = True + self.Is_shown = Is_shown + + def Run(self): + if self.Is_shown: + self.ShowTitle() + self.Battle() + if self.Is_shown: + self.ShowResult() + return self._ttt.Judge, self._ttt.Player + + def ShowTitle(self): + print("拡張〇×ゲーム") + + def Battle(self): + self._ttt.Init() + while True: + if self.Is_shown: + print("") + self.ShowBoard() + print("{}の番".format(self.PlayerStr())) + self._ttt.Set(self.Players_myturn[self._ttt.Player](self._ttt.GetBoard())) + if self.Is_shown: + print("--> {}".format(self._ttt.LastSet + 1)) + + if self._ttt.Judge != JUDGE["None"]: + break + + def ShowBoard(self): + print("--- ターン", self._ttt.Turn, " ---") + + flip = -1 if self._ttt.Player == PLAYER["Second"] else 1 + for row in range(self._ttt.BOARD_ROWS): + print(" ", end="") + for col in range(self._ttt.BOARD_COLS): + pos = row * self._ttt.BOARD_COLS + col + pIdx = PLAYER["First"] if self._ttt.Board[pos] * flip > 0 else PLAYER["Second"] if self._ttt.Board[pos] * flip < 0 else PLAYER["None"] + print(self.PLAYER_MARK[pIdx], end="") + if self._showNumber: + mark = " " if self._ttt.Board[pos] == 0 else "{}".format(abs(self._ttt.Board[pos])) + print(mark, end="") + if col < self._ttt.BOARD_COLS - 1: + print(" | ", end="") + print("") + if (row < self._ttt.BOARD_ROWS - 1): + if self._showNumber: + print(" -----+------+-----") + else: + print(" ---+----+---") + + def ShowResult(self): + msg = "" + if self._ttt.Judge == JUDGE["WIN"]: + msg = "{}の勝利".format(self.PlayerStr()) + elif self._ttt.Judge == JUDGE["DRAW"]: + msg = "引き分け" + elif self._ttt.Judge == JUDGE["OUT_OF_RANGE"]: + msg = "{}の反則負け(範囲外)".format(self.PlayerStr()) + elif self._ttt.Judge == JUDGE["OVERLAP"]: + msg = "{}の反則負け(重ね置き)".format(self.PlayerStr()) + + print("") + self.ShowBoard() + print(msg) + + def PlayerStr(self): + return self.PLAYER_STR[self._ttt.Player] diff --git a/README.md b/README.md index 2355e6e..2b3a1b2 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,15 @@ -PyTicTacToe_GA +PyTicTacToe =============== -拡張版〇×ゲームをpythonへと移植し,遺伝的アルゴリズムで勝とうと思ったときに作ったプロジェクト \ No newline at end of file +拡張版〇×ゲームをpythonへと移植し,遺伝的アルゴリズムで勝とうと思ったときに作ったプロジェクト + + +
+ +--- + +## 稼働方法 +1. イメージのビルド +docker build -f .devcontainer/Dockerfile -t marubatu . +2. コンテナの稼働 +docker build run -it marubatu \ No newline at end of file diff --git a/TicTacToe.py b/TicTacToe.py new file mode 100644 index 0000000..3214548 --- /dev/null +++ b/TicTacToe.py @@ -0,0 +1,77 @@ +import numpy as np +from numba import jit + +PLAYER = { + "None": 0, + "First": 1, + "Second": 2 +} + +JUDGE = { + "None": "none", + "OUT_OF_RANGE" : "out_of_range", + "OVERLAP": "overlap", + "WIN": "win", + "DRAW": "draw" +} + + +class TicTacToe: + + def __init__(self, turn_limit = 100): + self.BOARD_ROWS = 3 + self.BOARD_COLS = 3 + self.BOARD_SIZE = 9 + self.REMAIN_PIECES = 3 + self.PLAYERS = 2 + self.TURN_LIMIT = turn_limit + self.NONE = 0 + self.LINES = np.array([[0, 1, 2], [3, 4, 5], [6, 7, 8], [0, 3, 6], [1, 4, 7], [2, 5, 8], [0, 4, 8], [2, 4, 6]]) + + self.Board = np.array([self.NONE for i in range(self.BOARD_SIZE)]) + + def Init(self): + + self.Player = PLAYER["First"] + self.Turn = 1 + self.Judge = JUDGE["None"] + + + def Set(self, pos): + self.LastSet = pos + + if (pos < 0 or pos >= self.BOARD_SIZE): + self.Judge = JUDGE["OUT_OF_RANGE"] + return + + if (self.Board[pos] != 0): + self.Judge = JUDGE["OVERLAP"] + return + + if (self.Turn > self.REMAIN_PIECES): + self.Board[np.where(0 < self.Board)] -= 1 + self.Board[pos] = self.REMAIN_PIECES + else: + self.Board[pos] = self.Turn + + for line in self.LINES: + if (self.Board[line[0]] > 0 and self.Board[line[1]] > 0 and self.Board[line[2]] > 0): + self.Judge = JUDGE["WIN"] + return + + if self.Player == PLAYER["First"]: + self.Player = PLAYER["Second"] + else: + self.Player = PLAYER["First"] + self.Turn += 1 + if (self.Turn >= self.TURN_LIMIT): + self.Judge = JUDGE["DRAW"] + return + self.FlipBoard() + return + + def FlipBoard(self): + self.Board *= -1 + + def GetBoard(self): + return self.Board diff --git a/best_weight.npy b/best_weight.npy new file mode 100644 index 0000000..8dc86db --- /dev/null +++ b/best_weight.npy Binary files differ diff --git a/ga.py b/ga.py new file mode 100644 index 0000000..236aea2 --- /dev/null +++ b/ga.py @@ -0,0 +1,238 @@ +import numpy as np +import copy +import random +import time +from ConsolePlay import TTTConsole +from numba import jit + +JUDGE = { + "None": "none", + "OUT_OF_RANGE" : "out_of_range", + "OVERLAP": "overlap", + "WIN": "win", + "DRAW": "draw" +} + +input_vec_length = 54 +w1_length = 64 +out_length = 9 + +gene_length = 54 * 64 + 64 * 9 + +battle_num = 100 + +population = 50 +offspring_n = 24 +generation = 100000 +mutation_rate = 1.0/100.0 +print_freq = 100 + + +def init(): + gene_list = [np.random.rand(gene_length) for i in range(population)] + return gene_list + + +def sigmoid(x): + return 1 / (1 + np.exp(x)) + + +def RandTurn(board): + while True: + pos = np.random.randint(0, 8) + if board[pos] == 0: + break + return pos + + +def calc_penalty(judge, is_me): + if is_me: + penalty_dict = { + "overlap": 10000, + "win": 0, + "draw": 100 + } + return penalty_dict[judge] + else: + penalty_dict = { + "overlap": 0, + "win": 1000, + "draw": 100 + } + return penalty_dict[judge] + + +@jit +def nnTurn_module1(j, board_j, part_of_vec, input_vec): + if board_j < 0: + part_of_vec[board_j + 3] = 1.0 + elif 0 < board_j: + part_of_vec[board_j + 2] = 1.0 + input_vec[int(6 * j):int(6 * (j + 1))] = part_of_vec + return input_vec + +@jit +def nnTurn_module2(input_vec, w1, w2): + input_vec = input_vec.reshape((1, -1)) + out1 = np.dot(input_vec, w1) + out2 = 1 / (1 + np.exp(out1)) + out3 = np.dot(out2, w2) + pos = np.argmax(out3) + + return pos + + +def eval(gene_list): + penalty_list = list() + win_lose_draw_list = list() + for gene in gene_list: + w1 = gene[:(input_vec_length * w1_length)].reshape(w1_length, input_vec_length).T + w2 = gene[(input_vec_length * w1_length):].reshape(out_length, w1_length).T + + def nnTurn(board): + input_vec = np.zeros(input_vec_length, dtype=np.float) + for j in range(9): + part_of_vec = np.zeros(6).astype(np.float) + input_vec = nnTurn_module1(j, board[j], part_of_vec, input_vec) + + pos = nnTurn_module2(input_vec, w1, w2) + return pos + + cur_penalty = 0 + win_lose_draw = { + "win": 0, + "lose": 0, + "draw": 0, + "overlap": 0 + } + + for i in range(battle_num): + if i < battle_num // 2: + play = TTTConsole(nnTurn, RandTurn) + judge, player = play.Run() + cur_penalty += calc_penalty(judge, player == 1) + if judge == "win" and player == 1: + win_lose_draw["win"] += 1 + elif judge == "draw": + win_lose_draw["draw"] += 1 + elif judge == "overlap" and player == 1: + win_lose_draw["overlap"] += 1 + else: + win_lose_draw["lose"] += 1 + else: + play = TTTConsole(RandTurn, nnTurn) + judge, player = play.Run() + cur_penalty += calc_penalty(judge, player == 2) + if judge == "win" and player == 2: + win_lose_draw["win"] += 1 + elif judge == "draw": + win_lose_draw["draw"] += 1 + elif judge == "overlap" and player == 2: + win_lose_draw["overlap"] += 1 + else: + win_lose_draw["lose"] += 1 + penalty_list.append(cur_penalty) + win_lose_draw_list.append(win_lose_draw) + + return penalty_list, win_lose_draw_list + + +@jit +def choice_parents_module(population): + parent_1_index = np.random.randint(0, population - 1) + parent_2_index = np.random.randint(0, population - 1) + return parent_1_index, parent_2_index + + +def choice_parents(gene_list, fitness): + parent_1_index, parent_2_index = choice_parents_module(population) + if fitness[parent_1_index] > fitness[parent_2_index]: + parent = gene_list[parent_2_index] + else: + parent = gene_list[parent_1_index] + return parent + + +@jit +def crossover_module(parent_1, parent_2): + cross_one = random.randint(0, gene_length) + cross_second = random.randint(cross_one, gene_length) + + offspring_1 = parent_1.copy() + offspring_2 = parent_2.copy() + + offspring_1[cross_one:cross_second] = parent_2[cross_one:cross_second] + offspring_2[cross_one:cross_second] = parent_1[cross_one:cross_second] + + return offspring_1, offspring_2 + + +def crossover(gene_list, penalty_list): + parent_1 = choice_parents(gene_list, penalty_list) + parent_2 = choice_parents(gene_list, penalty_list) + + offspring_1, offspring_2 = crossover_module(parent_1, parent_2) + + return offspring_1, offspring_2 + + +@jit +def mutation(offspring): + target_index = np.where(np.random.rand(gene_length) < mutation_rate) + + return_array = np.random.rand(gene_length) + return_array[target_index] = offspring[target_index] + + return return_array + + +def elite(gene_list, penalty_list, next_gene, elite_n): + sort_penalty_list = sorted(penalty_list) + gen_tmp = [] + for i in range(elite_n): + index = penalty_list.index(sort_penalty_list[i]) + gen_tmp.append(gene_list[index]) + gen_tmp.extend(next_gene) + + return gen_tmp + + +def main(): + next_gene = [] + + gene_list = init() + best_penalty = 9999999 + + for generation_count in range(generation): + next_gene.clear() + penalty_list, win_lose_draw_list = eval(gene_list) + min_penalty = min(penalty_list) + ave_penalty = sum(penalty_list) / len(penalty_list) + + if min_penalty < best_penalty: + best_penalty = min_penalty + sort_penalty_list = sorted(penalty_list) + index = penalty_list.index(sort_penalty_list[0]) + np.save("best_weight", gene_list[index]) + + # print_result + if generation_count % print_freq == 0: + sort_penalty_list = sorted(penalty_list) + index = penalty_list.index(sort_penalty_list[0]) + best_result = win_lose_draw_list[index] + + print(f"generation: {generation_count} || min penalty: {min_penalty} || ave_penalty: {ave_penalty}") + print(f"best result... win:{best_result['win']} draw:{best_result['draw']} lose:{best_result['lose']} overlap:{best_result['overlap']}") + + for i in range(int(offspring_n / 2)): + offspring_1, offspring_2 = crossover(gene_list, penalty_list) + offspring_1 = mutation(offspring_1) + offspring_2 = mutation(offspring_2) + next_gene.extend([offspring_1, offspring_2]) + + gene_list = elite(gene_list, penalty_list, next_gene, (population - offspring_n)) + + + +if __name__ == '__main__': + main() diff --git a/main.py b/main.py new file mode 100644 index 0000000..049147e --- /dev/null +++ b/main.py @@ -0,0 +1,70 @@ +from ConsolePlay import TTTConsole +import numpy as np +from copy import deepcopy +from reinforcement.MinMaxAct import minMaxAct + + +def checkReach(board, Is_me = True): + board = board.copy + lines = np.array([[0, 1, 2], [3, 4, 5], [6, 7, 8], [0, 3, 6], [1, 4, 7], [2, 5, 8], [0, 4, 8], [2, 4, 6]]) + if not Is_me: + board *= -1 + for check_line in lines: + line_state = np.array([board[i] for i in check_line]) + + # 消え始めてから + if np.any(line_state == 2) and np.any(line_state == 3): + index = np.argmin(line_state) + if board[check_line[index]] == 0: + return check_line[index] + + # 序盤にリーチしてたとき + if np.any(line_state == 1) and np.any(line_state == 2) and not np.any(board == 3): + index = np.argmin(line_state) + if board[check_line[index]] == 0: + return check_line[index] + + return None + + + +def HumanTurn(board): + pos = 0 + while True: + print("どこに置きますか? (1:左上 - 9:右下): ") + pos = int(input()) + if 1 <= pos <= 9 and board[pos - 1] == 0: + break + return pos - 1 + + +def RandTurn(board): + pos = 0 + + while True: + pos = np.random.randint(0, 8) + if board[pos] == 0: + break + return pos + + +def SimpleCPU(board): + win_pos = checkReach(board, Is_me=True) + if win_pos is not None: + return win_pos + + difencive_pos = checkReach(board, Is_me=False) + if difencive_pos is not None: + return difencive_pos + + while True: + pos = np.random.randint(0, 8) + if board[pos] == 0: + break + + return pos + + +if __name__ == '__main__': + play = TTTConsole(HumanTurn, RandTurn, Is_shown=True) + play.Run() \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9d84e25 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,29 @@ +[tool.isort] +profile = "black" +line_length = 120 +skip_glob = "*/migrations/*.py" + +[tool.black] +line-length = 120 +include = '\.pyi?$' +extend-exclude = ''' +/( + | \.git + | templates + | migrations +)/ +''' + +[tool.flake8] +max-line-length = 120 +extend-ignore = "E203,W503" + + +[tool.mypy] +follow-imports = "normal" +ignore_missing_imports = true +show_column_numbers = true +pretty = false +disallow_untyped_calls = true +disallow_untyped_defs = true + diff --git a/reinforcement/AdaptiveTTT.py b/reinforcement/AdaptiveTTT.py new file mode 100644 index 0000000..397e185 --- /dev/null +++ b/reinforcement/AdaptiveTTT.py @@ -0,0 +1,238 @@ +import numpy as np +from numba import jit +from copy import deepcopy + + +def checkReach(board, Is_me = True): + board = deepcopy(board) + lines = np.array([[0, 1, 2], [3, 4, 5], [6, 7, 8], [0, 3, 6], [1, 4, 7], [2, 5, 8], [0, 4, 8], [2, 4, 6]]) + if not Is_me: + board *= -1 + for check_line in lines: + line_state = np.array([board[i] for i in check_line]) + + # 消え始めてから + if np.any(line_state == 2) and np.any(line_state == 3): + index = np.argmin(line_state) + if board[check_line[index]] == 0: + return check_line[index] + + # 序盤にリーチしてたとき + if np.any(line_state == 1) and np.any(line_state == 2) and not np.any(board == 3): + index = np.argmin(line_state) + if board[check_line[index]] == 0: + return check_line[index] + + return None + + +def SimpleCPU(board): + win_pos = checkReach(board, Is_me=True) + if win_pos is not None: + return win_pos + + difencive_pos = checkReach(board, Is_me=False) + if difencive_pos is not None: + return difencive_pos + + while True: + pos = np.random.randint(0, 8) + if board[pos] == 0: + break + + return pos + + +def RandTurn(board): + pos = 0 + + while True: + pos = np.random.randint(0, 8) + if board[pos] == 0: + break + return pos + + +PLAYER = { + "None": 0, + "First": 1, + "Second": 2 +} + +JUDGE = { + "None": "none", + "OUT_OF_RANGE" : "out_of_range", + "OVERLAP": "overlap", + "WIN": "win", + "DRAW": "draw" +} + + +class TicTacToe: + + def __init__(self, turn_limit = 100): + self.BOARD_ROWS = 3 + self.BOARD_COLS = 3 + self.BOARD_SIZE = 9 + self.REMAIN_PIECES = 3 + self.PLAYERS = 2 + self.TURN_LIMIT = turn_limit + self.NONE = 0 + self.LINES = np.array([[0, 1, 2], [3, 4, 5], [6, 7, 8], [0, 3, 6], [1, 4, 7], [2, 5, 8], [0, 4, 8], [2, 4, 6]]) + + self.Board = np.array([self.NONE for i in range(self.BOARD_SIZE)]) + + def Init(self): + + self.Player = PLAYER["First"] + self.Turn = 1 + self.Judge = JUDGE["None"] + self.Board = np.array([self.NONE for i in range(self.BOARD_SIZE)]) + + + def Set(self, pos): + self.LastSet = pos + + if (pos < 0 or pos >= self.BOARD_SIZE): + self.Judge = JUDGE["OUT_OF_RANGE"] + return + + if (self.Board[pos] != 0): + self.Judge = JUDGE["OVERLAP"] + return + + if (self.Turn > self.REMAIN_PIECES): + self.Board[np.where(0 < self.Board)] -= 1 + self.Board[pos] = self.REMAIN_PIECES + else: + self.Board[pos] = self.Turn + + for line in self.LINES: + if (self.Board[line[0]] > 0 and self.Board[line[1]] > 0 and self.Board[line[2]] > 0): + self.Judge = JUDGE["WIN"] + return + + if self.Player == PLAYER["First"]: + self.Player = PLAYER["Second"] + else: + self.Player = PLAYER["First"] + self.Turn += 1 + if (self.Turn >= self.TURN_LIMIT): + self.Judge = JUDGE["DRAW"] + return + self.FlipBoard() + return + + def FlipBoard(self): + self.Board *= -1 + + def GetBoard(self): + return self.Board + + +class TTTConsole: + def __init__(self, Is_shown=False): + self.PLAYER_STR = ["", "先手", "後手"] + self.PLAYER_MARK = [" ", "○", "×"] + + self._ttt = TicTacToe() + self._showNumber = True + self.Is_shown = Is_shown + + def ShowTitle(self): + print("拡張〇×ゲーム") + + def step(self, agent_act): + # agent + if self.Is_shown: + print("") + self.ShowBoard() + print("{}の番".format(self.PlayerStr())) + self._ttt.Set(agent_act(self._ttt.GetBoard())) + if self.Is_shown: + print("--> {}".format(self._ttt.LastSet + 1)) + + if self._ttt.Judge != JUDGE["None"]: + if self.Is_shown: + self.ShowResult() + return self._ttt.Judge, self._ttt.Player, self._ttt.GetBoard() + + # 敵 + if self.Is_shown: + print("") + self.ShowBoard() + print("{}の番".format(self.PlayerStr())) + self._ttt.Set(SimpleCPU(self._ttt.GetBoard())) + if self.Is_shown: + print("--> {}".format(self._ttt.LastSet + 1)) + + if self._ttt.Judge != JUDGE["None"]: + if self.Is_shown: + self.ShowResult() + + return self._ttt.Judge, self._ttt.Player, self._ttt.GetBoard() + + + def ShowBoard(self): + print("--- ターン", self._ttt.Turn, " ---") + + flip = -1 if self._ttt.Player == PLAYER["Second"] else 1 + for row in range(self._ttt.BOARD_ROWS): + print(" ", end="") + for col in range(self._ttt.BOARD_COLS): + pos = row * self._ttt.BOARD_COLS + col + pIdx = PLAYER["First"] if self._ttt.Board[pos] * flip > 0 else PLAYER["Second"] if self._ttt.Board[pos] * flip < 0 else PLAYER["None"] + print(self.PLAYER_MARK[pIdx], end="") + if self._showNumber: + mark = " " if self._ttt.Board[pos] == 0 else "{}".format(abs(self._ttt.Board[pos])) + print(mark, end="") + if col < self._ttt.BOARD_COLS - 1: + print(" | ", end="") + print("") + if (row < self._ttt.BOARD_ROWS - 1): + if self._showNumber: + print(" -----+------+-----") + else: + print(" ---+----+---") + + def ShowResult(self): + msg = "" + if self._ttt.Judge == JUDGE["WIN"]: + msg = "{}の勝利".format(self.PlayerStr()) + elif self._ttt.Judge == JUDGE["DRAW"]: + msg = "引き分け" + elif self._ttt.Judge == JUDGE["OUT_OF_RANGE"]: + msg = "{}の反則負け(範囲外)".format(self.PlayerStr()) + elif self._ttt.Judge == JUDGE["OVERLAP"]: + msg = "{}の反則負け(重ね置き)".format(self.PlayerStr()) + + print("") + self.ShowBoard() + print(msg) + + def PlayerStr(self): + return self.PLAYER_STR[self._ttt.Player] + + def init_TTTenv(self): + self._ttt.Init() + + def reset_randTTT_env(self): + self._ttt.Init() + + # ここは0~6にしたほうが良い? + init_turn = np.random.randint(0, 1) + + for _ in range(init_turn): + if self.Is_shown: + print("") + self.ShowBoard() + print("{}の番".format(self.PlayerStr())) + self._ttt.Set(RandTurn(self._ttt.GetBoard())) + if self.Is_shown: + print("--> {}".format(self._ttt.LastSet + 1)) + + if self._ttt.Judge != JUDGE["None"]: + self.ShowResult() + + state = tuple(self._ttt.GetBoard()) + return state diff --git a/reinforcement/MinMaxAct.py b/reinforcement/MinMaxAct.py new file mode 100644 index 0000000..5c085bf --- /dev/null +++ b/reinforcement/MinMaxAct.py @@ -0,0 +1,100 @@ +from copy import deepcopy +import numpy as np +from tqdm import tqdm +from numba import njit, prange + +@njit(cache=True) +def makeNextBoard(set_pos, board, cur_peaces_num): + copy_board = board.copy() + if cur_peaces_num == 3: + copy_board[np.where(0 < copy_board)] -= 1 + copy_board[set_pos] = 3 + else: + copy_board[set_pos] = cur_peaces_num + 1 + + return copy_board + + +@njit(cache=True) +def evaluate(depth, board, is_me): + lines = np.array([[0, 1, 2], [3, 4, 5], [6, 7, 8], [0, 3, 6], [1, 4, 7], [2, 5, 8], [0, 4, 8], [2, 4, 6]]) + for line in lines: + if (board[line[0]] > 0 and board[line[1]] > 0 and board[line[2]] > 0): + if is_me: + return 10 - depth + else: + return depth - 10 + + return 0 + + +@njit(cache=True) +def minmax(depth, board, is_me, max_depth=10): + copy_board = board.copy() + + eval_val = evaluate(depth, copy_board, is_me) + + if eval_val != 0 or depth == max_depth: + return eval_val + + # 次にさせる場所をチェック + # flipboard + is_me = not is_me + copy_board *= -1 + + next_pos_arr = np.where(copy_board == 0)[0] + cur_my_peaces = np.where(copy_board > 0)[0].shape[0] + + best_value = 0 + value = -10000 if is_me else 10000 + + for pos_cand in next_pos_arr: + next_board = makeNextBoard(pos_cand, copy_board, cur_my_peaces) + child_val = minmax(depth + 1, next_board, is_me, max_depth) + + if is_me: + if child_val > value: + value = child_val + best_pos_cand = pos_cand + else: + if child_val < value: + value = child_val + best_pos_cand = pos_cand + # valueを空にする + + return value + + +@njit(parallel=True, cache=True) +def minMaxAct_submodule(copy_board, cur_my_peaces, eval_arr): + for pos_cand in prange(9): + + if copy_board[pos_cand] == 0: + next_board = makeNextBoard(pos_cand, copy_board, cur_my_peaces) + eval_arr[pos_cand] = minmax(0, next_board, True, max_depth=20) + + return eval_arr + + +def minMaxAct(board): + eval_arr = np.zeros(9) + copy_board = board.copy() + cur_my_peaces = np.where(copy_board > 0)[0].shape[0] + + # compile用 + # minmax(20, copy_board, True) + + #if cur_my_peaces == 0 and board[1] == board[3] == board[5] == board[7] == 0: + # return [1, 3, 5, 7][np.random.randint(0, 4)] + + eval_arr = minMaxAct_submodule(copy_board, cur_my_peaces, eval_arr) + + print(eval_arr) + + pos = 0 + while True: + print("どこに置きますか? (1:左上 - 9:右下): ") + pos = int(input()) + if 1 <= pos <= 9 and board[pos - 1] == 0: + break + return pos - 1 \ No newline at end of file diff --git a/reinforcement/QLearningAgent.py b/reinforcement/QLearningAgent.py new file mode 100644 index 0000000..f80f0aa --- /dev/null +++ b/reinforcement/QLearningAgent.py @@ -0,0 +1,113 @@ +import copy +import numpy as np + +class QLearningAgent: + """ + Q学習 エージェント + """ + + def __init__( + self, + alpha=.2, + epsilon=.1, + gamma=.99, + actions=None, + observation=None): + self.alpha = alpha + self.gamma = gamma + self.epsilon = epsilon + self.reward_history = [] + self.actions = actions + self.state = str(observation) + self.ini_state = str(observation) + self.previous_state = None + self.previous_action = None + self.q_values = self._init_q_values() + + def _init_q_values(self): + """ + Q テーブルの初期化 + """ + q_values = {} + q_values[self.state] = np.repeat(0.0, len(self.actions)) + return q_values + + def init_state(self): + """ + 状態の初期化 + """ + self.previous_state = copy.deepcopy(self.ini_state) + self.state = copy.deepcopy(self.ini_state) + return self.state + + def act(self, board): + # ε-greedy選択 + if np.random.uniform() < self.epsilon: # random行動 + while True: + action = np.random.randint(0, len(self.q_values[self.state])) + if board[action] == 0: + break + else: # greedy 行動 + tmp = self.q_values[self.state].copy() + index = 0 + # ここはもっと考えなくちゃいけなさそう + while True: + action = tmp.argsort()[::-1][index] + if board[action] == 0: + break + index += 1 + + self.previous_action = action + return action + + def observe(self, next_state, reward=None): + """ + 次の状態と報酬の観測 + """ + next_state = str(next_state) + if next_state not in self.q_values: # 始めて訪れる状態であれば + self.q_values[next_state] = np.repeat(0.0, len(self.actions)) + + self.previous_state = copy.deepcopy(self.state) + self.state = next_state + + if reward is not None: + self.reward_history.append(reward) + self.learn(reward) + + def learn(self, reward): + """ + Q値の更新 + """ + q = self.q_values[self.previous_state][self.previous_action] # Q(s, a) + max_q = max(self.q_values[self.state]) # max Q(s') + # Q(s, a) = Q(s, a) + alpha*(r+gamma*maxQ(s')-Q(s, a)) + self.q_values[self.previous_state][self.previous_action] = q + \ + (self.alpha * (reward + (self.gamma * max_q) - q)) + + def test_observe(self, next_state): + next_state = str(next_state) + + self.previous_state = copy.deepcopy(self.state) + self.state = next_state + + def test_act(self, board): + # greedy 行動 + if self.state in self.q_values: + tmp = self.q_values[self.state].copy() + index = 0 + # ここはもっと考えなくちゃいけなさそう + while True: + action = tmp.argsort()[::-1][index] + if board[action] == 0: + break + index += 1 + + self.previous_action = action + else: + while True: + print("ランダム行動を実行") + action = np.random.randint(0, 8) + if board[action] == 0: + break + return action diff --git a/reinforcement/QLearningUtils.py b/reinforcement/QLearningUtils.py new file mode 100644 index 0000000..7e4fa4a --- /dev/null +++ b/reinforcement/QLearningUtils.py @@ -0,0 +1,47 @@ +JUDGE = { + "None": "none", + "OUT_OF_RANGE" : "out_of_range", + "OVERLAP": "overlap", + "WIN": "win", + "DRAW": "draw" +} + + +def ReferRewrads(judge, is_me, board): + state = tuple(board) + if judge == "none": + reward, is_end_episode = 0, False + return state, reward, is_end_episode + + if is_me: + reward_dict = { + "out_of_range": -100, + "overlap": -100, + "win": 100, + "draw": -1 + } + is_end_episode = True + + return state, reward_dict[judge], is_end_episode + + else: + reward_dict = { + "out_of_range": 0, + "overlap": 0, + "win": -100, + "draw": -1 + } + is_end_episode = True + + return state, reward_dict[judge], is_end_episode + + +def decodeResult(judge, is_me): + if judge == "draw": + return "draw" + elif is_me and judge == "win": + return "win" + elif is_me and judge == "overlap": + return "overlap" + else: + return "lose" diff --git a/reinforcement/main.py b/reinforcement/main.py new file mode 100644 index 0000000..8af757b --- /dev/null +++ b/reinforcement/main.py @@ -0,0 +1,93 @@ +import numpy as np +from QLearningAgent import QLearningAgent +from AdaptiveTTT import TTTConsole +from QLearningUtils import ReferRewrads, decodeResult +import pickle + +NB_EPISODE = 5000000 # エピソード数 +EPSILON = 0.1 # 探索率 +ALPHA = 0.1 # 学習率 +GAMMA = 0.90 # 割引率 +ACTIONS = np.arange(9) # 行動の集合 + +if __name__ == '__main__': + ttt_env = TTTConsole(Is_shown=False) + ini_state = (0, 0, 0, 0, 0, 0, 0, 0, 0) + agent = QLearningAgent( + alpha=ALPHA, + gamma=GAMMA, + epsilon=EPSILON, + actions=ACTIONS, + observation=ini_state + ) + rewards = [] + is_end_episode = False + ttt_env.init_TTTenv() + myplayer_num = ttt_env._ttt.Player + + result_dict = { + "win": 0, + "draw": 0, + "lose": 0, + "overlap": 0 + } + + for episode in range(NB_EPISODE): + episode_reward = list() + + while(is_end_episode == False): + action = agent.act + judge, player, board = ttt_env.step(action) + state, reward, is_end_episode = ReferRewrads(judge, player == myplayer_num, board) + agent.observe(state, reward) + episode_reward.append(reward) + result = decodeResult(judge, player == myplayer_num) + result_dict[result] += 1 + if (episode) % 10000 == 9999: + print(f"[episode: {episode + 1}] win: {result_dict['win']} draw: {result_dict['draw']} lose: {result_dict['lose']} overlap: {result_dict['overlap']}") + result_dict = { + "win": 0, + "draw": 0, + "lose": 0, + "overlap": 0 + } + rewards.append(np.sum(episode_reward)) + is_end_episode = False + state = ttt_env.reset_randTTT_env() + myplayer_num = ttt_env._ttt.Player + agent.observe(state) + + agent.epsilon = 0.0 + + result_dict = { + "win": 0, + "draw": 0, + "lose": 0, + "overlap": 0 + } + + for episode in range(100000): + + while(is_end_episode == False): + action = agent.test_act + judge, player, board = ttt_env.step(action) + state, reward, is_end_episode = ReferRewrads(judge, player == myplayer_num, board) + agent.test_observe(state) + result = decodeResult(judge, player == myplayer_num) + result_dict[result] += 1 + if (episode) % 10000 == 9999: + print( + f"[episode: {episode + 1}] win: {result_dict['win']} draw: {result_dict['draw']} lose: {result_dict['lose']} overlap: {result_dict['overlap']}") + result_dict = { + "win": 0, + "draw": 0, + "lose": 0, + "overlap": 0 + } + is_end_episode = False + state = ttt_env.reset_randTTT_env() + myplayer_num = ttt_env._ttt.Player + agent.test_observe(state) + + with open("q_values.pkl", "wb") as f: + pickle.dump(agent.q_values, f) diff --git a/reinforcement/q_values.pkl b/reinforcement/q_values.pkl new file mode 100644 index 0000000..62889cb --- /dev/null +++ b/reinforcement/q_values.pkl Binary files differ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..48bc451 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,19 @@ +black==22.1.0 +click==8.0.4 +flake8==4.0.1 +isort==5.10.1 +llvmlite==0.38.0 +mccabe==0.6.1 +mypy==0.931 +mypy-extensions==0.4.3 +numba==0.55.1 +numpy==1.21.5 +pathspec==0.9.0 +platformdirs==2.5.1 +pycodestyle==2.8.0 +pyflakes==2.4.0 +pyproject-flake8==0.0.1a2 +toml==0.10.2 +tomli==2.0.1 +tqdm==4.63.0 +typing_extensions==4.1.1 \ No newline at end of file