Blow Up by Black Swan

Pythonーplotly dashを使ったチャートアプリの作成(プログラミング編1)

今回は具体的なプログラミングについてまとめました。プログラム編については分量が多くなってきてしまったため、3つに分けています。他のシリーズは下記になります。

初めて自分でオリジナルのプログラムを構築して行く中で、プログラミングは非常に数学の場合分けに似ているなということです。あるプログラムを実行した時にどの様な可能性があり、それぞれに対しさらにどの様なプログラムを付け加えるか、それを全体と細部の両方において行っていくのはある種パズルのようで面白いなと思いました。

全体と細部でどういう風に構成されているか、できるだけわかりやすく書いたつもりですので、参考になれば幸いです。

1. チャートアプリのプログラム構造

チャートアプリ全体のプログラムは以下の3つの部分に分けることができます。

  1. APIからデータを取得するプログラム
  2. 表示期間に対応するデータを持つテーブルを作成・更新するプログラム
  3. チャート表示するアプリケーション

ここで2のテーブルを表示期間に対応するテーブルを作成した理由は、表示期間が長期になる程データ量が莫大になり、かなり重たくなることが考えられたためです。例えば、APIから取得したデータが直に保存されるベーステーブルのデータを使って、期間1ヶ月のチャートを表示する場合、期間1日のチャートを表示する場合に比べ、単純計算でも約30倍近いデータを通信する必要があります。

そのため、それぞれの期間に合わせてベーステーブルから大幅にデータを間引いた専用テーブルを作成しています。この専用テーブルを作成することで、各表示期間に対応したデータにより簡単に、かつ軽量なデータのやり取りを行うことができます。以前の記事にも載せた全体図は下記になります。

アプリケーションアーキテクチャ

1-1. その他のプログラム案

プログラム構造については次のような形式でもよかったのかなと思っています。

  • チャートデータは毎回MySQLserverから直に取得しているが、キャッシュを利用し、最新データ以外はキャッシュで保管し、アクセススピードやMySQLserverへの負荷低下を図っても良かった
  • 実行時に設定ファイルが指定できる様にし、テスト用と本番用を簡単に入れ替えられる様にしておけるとデバッグなどがやりやすくなると思われる

2. APIからデータを取得するプログラム

まずはAPIからデータを取得するプログラムです。チャートの基礎となるデータを取得する部分で重要なプログラムです。必要なファイルは以下の4つです。

  • make_BaseTable.py -> APIからデータを取得するメインプログラム
  • db.py -> データベースの処理に関係する関数を格納するプログラム
  • schema.py -> DBの構造を規定するファイル
  • config.ini -> メインプログラムで必要となる設定情報を格納するファイル

2-1. 全体構成と役割

プログラムの全体の流れは以下になります。

   ① データベース作成(make_db)
-> ② APIから過去データの取得(bf.past_data)
      APIのparam,"before"と"after"を設定することで過去データを取得
-> ③ リアルタイムでAPIからデータを取得(bf.real_data)
      past_dateの戻り値のlast_idを受け取り、whileループでAPIからデータを継続的に取得

API関連の処理についてはクラス文でまとめています。

2-2. プログラムの作り

次は各プログラムの作りです。

2-2-1. db.py

下記が「db.py」の記載内容です。

"""
DBの取り扱いに関する関数を格納
get_db(): DBとの接続
make_db(): DBと関連テーブルの作成(基本は初回の1回しか実行しない)
insert_db(): DBへのデータ保存
"""
import mysql.connector


def get_db(config):
    """
    DB serverとの接続用メソッド
    :param config: DBと接続するための設定ファイル
    :return: Connectionオブジェクト, Cursorオブジェクト
    """
    try:
        con_obj = mysql.connector.connect(**config)
    except Exception as err:    #エラーが出た場合の処理
        exit()
    cur_obj = con_obj.cursor()
    return con_obj, cur_obj


def make_db(queries, config):
    """
    DBを作成するメソッド
    :param queries: SQL文
    :param config: DBと接続するための設定ファイル
    :return: nothing
    """
    con_obj, cur_obj = get_db(config)
    for query in queries:
        cur_obj.execute(query)
    cur_obj.close()
    con_obj.close()


def insert_db(con_obj, cur_obj, data_list):
    """
    DBにデータを追加するメソッド
    :param con_obj: Connectoinオブジェクト
    :param cur_obj: Cursorオブジェクト
    :param data_list: <list> 追加データのリスト
    :return: nothing
    """
    insert_query = (
        "INSERT INTO blowupbbs_crypto.bf_base (id, price, timestamp) "
        "VALUES (%s, %s, %s);"
    )
    cur_obj.executemany(insert_query, data_list)
    con_obj.commit()

2-2-2. schema.py

DBを作成するためのスキーマである「schema.py」は以下になります。

"""
スキーマの構造
1. 同名データベースがあれば削除
2. データベース作成(blowupbbs_crypto)
3. ベースとなるテーブル作成(bf_base)
   column: no, id, price, timestamp
4. 各憑依期間に対応するテーブルを作成(bf_aday, bf_aweek, bf_amonth)
   column: no, base_no(bf_baseのno), id, price, timestamp
"""

db_schema = [
    "DROP DATABASE IF EXISTS blowupbbs_crypto;",  # DB作成
    "CREATE DATABASE blowupbbs_crypto;",
    "CREATE TABLE blowupbbs_crypto.bf_base (no int AUTO_INCREMENT UNIQUE NOT NULL,"
                             "id int NOT NULL,"
                             "price DOUBLE(9,1) NOT NULL,"
                             "timestamp DOUBLE(13,3) NOT NULL,"
                             "primary key(id), index(no));",
    "CREATE TABLE blowupbbs_crypto.bf_aday (no int AUTO_INCREMENT UNIQUE NOT NULL,"
                             "base_no int NOT NULL,"
                             "id int NOT NULL,"
                             "price DOUBLE(9,1) NOT NULL,"
                             "timestamp DOUBLE(13,3) NOT NULL,"
                             "primary key(id),index(no));",
    "CREATE TABLE blowupbbs_crypto.bf_aweek (no int AUTO_INCREMENT UNIQUE NOT NULL,"
                              "base_no int NOT NULL,"
                              "id int NOT NULL,"
                              "price DOUBLE(9,1) NOT NULL,"
                              "timestamp DOUBLE(13,3) NOT NULL,"
                              "primary key(id),""index(no));",
    "CREATE TABLE blowupbbs_crypto.bf_amonth (no int AUTO_INCREMENT UNIQUE NOT NULL,"
                               "base_no int NOT NULL,"
                               "id int NOT NULL,"
                               "price DOUBLE(9,1) NOT NULL,"
                               "timestamp DOUBLE(13,3) NOT NULL,"
                               "primary key(id), index(no));"
]

2-2-3. config.ini

次が設定ファイルの「config.ini」です。メインプログラムの諸設定が記載されています。実際には次回説明する「sort_to_data.py」と共有しており、それ用の設定も記載しています。

[API_CONFIG]
URL = https://api.bitflyer.jp/v1/getexecutions
COUNT = 500    # 500が最大値
AFTER = <number>    # 2018年12月19日から取得できるデータは最大で1ヶ月前のものまでに変更
BEFORE = <number>

[SLACK_CONFIG]
SLACK_URL = <SLACKのURL>

[DBserverT]
USER = <db_user>
HOST = <db_host>
PASSWORD = <db_password>

メインプログラムでは以上のデータをconfigparserモジュールを利用して、読み込んでいます。

2-2-4. make_BaseTable.py

最後が「make_BaseTable.py」ファイルです。

"""
ベースとなるテーブルを作るための実行ファイル
BF_apiクラス
  __init__ -> コンストラクタ
  to_timestamp -> 取得したAPIデータ内の時刻データをタイムスタンプに変換
  slack_error -> slackへの連絡用メソッド
  past_data -> 過去データを取得するメソッド
  real_data -> リアルタイムでAPIからデータを取得するメソッド
"""

import configparser
import datetime
import json
import os
import time
import traceback

import pytz
import requests

from db import get_db, make_db, insert_db
# from schema import db_schema


class BF_api:
    def __init__(self):
        """
        コンストラクタ
        """
        config_path = os.path.join(os.path.abspath("."), "config.ini")
        cfg = configparser.ConfigParser()
        cfg.read(config_path)
        self.api_url = cfg["API_CONFIG"]["URL"]
        self.count = cfg["API_CONFIG"]["COUNT"]
        self.after = cfg["API_CONFIG"]["AFTER"]
        self.before = cfg["API_CONFIG"]["BEFORE"]
        self.config = {
            "user": cfg["DBserverT"]["USER"],
            "host": cfg["DBserverT"]["HOST"],
            "password": cfg["DBserverT"]["PASSWORD"]
        }
        self.slack_url = cfg["SLACK_CONFIG"]["SLACK_URL"]

    def to_timestamp(self, dt):
        """
        APIから取得したデータ内の時刻データをタイムスタンプに変換
        :param dt: <str> 時刻データ
        :return: <int> タイムスタンプ
        """
        try:
            dt_obj = datetime.datetime.strptime(dt, "%Y-%m-%dT%H:%M:%S.%f")
        except:
            dt_obj = datetime.datetime.strptime(dt, "%Y-%m-%dT%H:%M:%S")
        tz_obj = pytz.utc.localize(dt_obj)
        ts = tz_obj.timestamp()
        return ts

    def slack_error(self, concept, err, err_c, place):
        """
        エラー情報をslackに送るメソッド
        :param concept: エラー概要
        :param err: エラー内容1
        :param err_c: エラー内容2
        :param place: エラー個所
        :return: nothing
        """
        slack_url = self.slack_url
        payload = {
            "attachments": [
                {
                    "pretext": "*ERROR*",
                    "title": "{}".format(concept),
                    "text": "error: {0}\nerror contents: {1}\nplace: {2}".format(err, err_c, str(place)),
                    "mrkdwn_in": ["pretext"]
                }
            ]
        }
        requests.post(slack_url, data=json.dumps(payload))

    # param設定-> ループ【apiアクセス -> json化 -> データの並び替え -> データ保存 -> param修正】 -> con,curクローズ
    def past_data(self):
        """
        APIから過去データを取得するメソッド
        (1) param設定
        (2)ループ
            (2-1)APIアクセス
                (想定エラー1) APIに問題がある(ステータスコードが200でない) -> slackへ連絡
                (想定エラー2) idで決めた区間にデータがない場合(空のリスト)
                    2-1 現在時刻近辺までデータ取得が進んでいる -> ループ抜け出しreal_dataの呼び出しへ
                    2-2 まだ十分に過去データが取得できていない -> 次のid区間のAPIデータ取得へ
            (2-2)データの並び替え・整理
            (2-3)データ保存
            (2-4)param修正
        (3)DBとのコネクション切断
        ※ ところどころにある"time.sleep(1)はAPIの呼び出し制限に引っかからない様にするため
        :return: <int> last_id -> real_dateメソッドにAPIデータの重複や空白なく引き継ぐために利用
        """
        # (1)param設定
        params = {'count': self.count,
                  'after': self.after,
                  'before': self.before
                  }

        con_obj, cur_obj = get_db(self.config)
        last_ts = 0

        # (2)ループ
        while True:
            # (2-1)APIアクセス
            api_data_json = requests.get(self.api_url, params)
            # 想定エラー1処理: APIに問題がある -> slackに連絡
            if api_data_json.status_code != 200:
                concept = "api server error"
                err = "status code error"
                try:
                    err_c = api_data_json.json()
                except Exception as err:
                    er = "json error"
                    err_c = err
                place = params['after']
                self.slack_error(concept, er, err_c, place)
                break
            api_data = api_data_json.json()
            # 想定エラー2処理: 対象区間にデータが存在しない ->
            if not api_data:
                current_time = time.time()
                # 想定エラー2-1 現在時刻近辺までデータ取得が進んでいる -> ループ抜け出しreal_dataの呼び出しへ
                if (current_time - last_ts) <= (60 * 5):
                    break
                # 想定エラー2-2 まだ十分に過去データが取得できていない -> 次のid区間のAPIデータ取得へ
                params['before'] = int(params['before']) + 500
                time.sleep(0.1)
                continue
            # (2-2)データの並び替え・整理
            api_data.sort(key=lambda x: x['id'])
            data_list = []
            for i in range(len(api_data)):
                data_list.append((api_data[i]['id'],
                                  api_data[i]['price'],
                                  self.to_timestamp(api_data[i]['exec_date'])))  # APIデータの整形作業完成
            last_id = api_data[-1]['id']
            last_ts = data_list[-1][-1]

            # (2-3)データ保存部分
            try:
                insert_db(con_obj, cur_obj, data_list)
            except Exception as err_c:    # エラーが生じた場合の対応
                concept = "DB insert error"
                err = "Mistake executing sql query"
                place = api_data[0]['id']
                self.slack_error(concept, err, err_c, place)
                break
            # (2-4)paramの再設定
            params['after'] = last_id
            params['before'] = last_id + 500
            time.sleep(0.1)

        # (3)DBとのコネクション切断
        cur_obj.close()
        con_obj.close()
        print("end: past_data()")
        return last_id    # real_dateでつつがなく引き継ぐため

    def real_data(self, last_id):
        """
        リアルタイムデータ取得メソッド
        (1) param設定
        (2)ループ
            (2-1)APIアクセス
                (想定エラー1) APIに問題がある(ステータスコードが200でない) -> slackへ連絡
                (想定エラー2) idで決めた区間にデータがない場合(空のリスト) -> 時間を置いて再度(1)APIアクセス
            (2-2)データの並び替え・整理
            (2-3)データ保存
            (2-4)param修正
        :param last_id: <int> past_dateメソッドから返されたもの
        :return: nothing
        """
        # (1)param設定
        params = {'count': self.count,
                  'after': last_id}

        con_obj, cur_obj = get_db(self.config)

        # (2)ループ
        while True:
            # (2-1)APIアクセス
            api_data_json = requests.get(self.api_url, params)
            # (想定エラー1) APIに問題がある(ステータスコードが200でない) -> slackへ連絡
            if api_data_json.status_code != 200:
                concept = "api server error"
                err = "status code error"
                err_c = traceback.format_exc()
                place = params['after']
                self.slack_error(concept, err, err_c, place)
                exit()
            api_data = api_data_json.json()
            # (想定エラー2) idで決めた区間にデータがない場合(空のリスト) -> 時間を置いて再度(1)APIアクセス
            if not api_data:
                time.sleep(3)
                continue
            # (2-2)データの並び替え・整理
            api_data.sort(key=lambda x: x['id'])
            data_list = []
            for i in range(len(api_data)):
                data_list.append([api_data[i]['id'],
                                  api_data[i]['price'],
                                  self.to_timestamp(api_data[i]['exec_date'])])
            print(api_data[-1]['exec_date'])
            last_id = api_data[-1]['id']

            # (3)データ保存
            try:
                insert_db(con_obj, cur_obj, data_list)
            except Exception as err_c:
                concept = "DB insert error"
                err = "Mistake executing sql query"
                place = api_data[0]['id']
                self.slack_error(concept, err, err_c, place)
                break
            # (4)param修正
            params['after'] = last_id
            time.sleep(3)


# 実行プログラム
if __name__ == '__main__':
    bf = BF_api()
    make_db(db_schema, bf.config)
    last_id = bf.past_data()
    bf.real_data(last_id)

3. まとめ

以上がAPIからデータを取得するプログラムです。次は、各機関に対応するテーブルを作成するプログラムについてです。

読んで頂き、ありがとうございました。