Blow Up by Black Swan

Pythonーplotly dashを使ったチャートアプリの作成(プログラミング編2)

今回は、プログラム編の2回目で、表示期間に対応するデータを持つテーブルを作成・更新するプログラムについてのまとめです。また他の記事は以下になります。

1. グラフ表示の各期間用のプログラム

今回のプログラムはベースとなるテーブルから各期間に対応するチャート用テーブルを作るものです。

1-1. 全体の構成と役割

この個別のテーブルを作成する理由は、ベーステーブルからデータをとるとデータ量があまりにも多くなり、チャートの表示に大きな支障をきたす可能性があるからです。そのため、ある程度のデータを間引いたテーブルを作成し、そこからデータを取得することで、スムーズにチャートが表示されることを意図しています。各期間のチャートとその基盤となるデータを格納するテーブルとの関係は以下になります。

  • 1ヶ月チャート -> bf_amonth
  • 1週間チャート -> bf_aweek
  • 1日チャート -> bf_aday
  • 6時間チャート -> bf_base(ベーステーブル)
  • 1時間チャート -> bf_base(ベーステーブル)

また、ベーステーブルから期間表示用の各テーブルへのデータの振り分け方法は下記になります。

  • 1ヶ月チャート(bf_amonth) -> 2時間に1データ
  • 1週間チャート(bf_aweek) -> 30分に1データ
  • 1日チャート(bf_aday) -> 5分に1データ

1チャートあたりのデータを300前後に抑えることを目処に上記の様な振り分けを行なっています。ファイル構造は以下になります。

  • sort_to_data.py -> ベーステーブル(bf_base)に溜まった過去のデータを仕分けしていくプログラム。現在時刻近辺まで来るとプログラムが終了する
  • amonth.py -> sort_to_data.pyのプログラムが終了したのち、リアルタイムに更新されるベーステーブルのデータを1ヶ月表記用のテーブルに仕分けしていく
  • aweek.py -> sort_to_data.pyのプログラムが終了したのち、リアルタイムに更新されるベーステーブルのデータを1週間表記用のテーブルに仕分けしていく
  • aday.py -> sort_to_data.pyのプログラムが終了したのち、リアルタイムに更新されるベーステーブルのデータを1日表記用のテーブルに仕分けしていく
  • config.ini -> データベースなどの設定情報を記載(※make_BaseTable.pyと情報を共有している)

APIからデータを取得するプログラムの全体像は以下の様になります。

   APIから過去データを取得しベーステーブル作成(make_BaseTable.py)
-> 過去データの取得が終わり、リアルタイムデータ収集に移行
-> sort_to_table.pyで過去データを各期間用のテーブルに仕分け
-> cronでaday.py(5分間隔)、aweek.py(30分間隔)、amonth.py(2時間間隔)を一定時間ごとに実行
   ※ sort_to_table.pyがどの程度の時間で終了するかわからないため、あらかじめこれらのデータを橋わせておく
     -> そのため各プログラムの最初にif文で状況を判断できる様にしてある
-> 現在時点まできたらsort_to_table.py終了するも、aday.py、aweek.py、amonth.pyで実行

1-2. プログラムの作り

各プログラムの作りは以下の様になっています。

1-2-1. sort_to_data.py

下記が「sort_to_data.py」の記載内容です。

"""
ベーステーブルから1ヶ月,1週間,1日表記用のテーブルにデータを振り分けるための実行ファイル。
DB接続設定: DBserverA
amonth(): 1ヶ月表記用テーブルへのデータの保存
aweek() : ワンウィーク表記用テーブルへのデータの保存
aday()  : ワンデイ表記用テーブルへのデータの保存
"""
import configparser
import os
import time
from db import get_db
config_file = os.path.join(os.path.abspath('.'), "config.ini")
cfg = configparser.ConfigParser()
cfg.read(config_file)
config = {
    "user": cfg["DBserverB"]["USER"],
    "host": cfg["DBserverB"]["HOST"],
    "password": cfg["DBserverB"]["PASSWORD"]
}
start_ts = float(cfg['SORT_CONFIG']['START_TIME'])
confirm_query = "SELECT timestamp from blowupbbs_crypto.bf_base ORDER BY no DESC LIMIT 1;"
select_query = "SELECT * FROM blowupbbs_crypto.bf_base WHERE timestamp >= %s AND %s > timestamp limit 1;"
sec2h = 60 * 60 * 2
sec30m = 60 * 30
sec5m = 60 * 5
def amonth(cur_obj, query, start_ts):
    """
    1ヶ月表記用テーブルの作成・更新
    2時間で1つのデータをbf_amonthテーブルに追加する
    (1) bf_baseテーブルからデータ取得
      想定事例: 取得するデータがなく空のリストが返ってきた場合 -> Falseを返す
    (2) bf_amonthテーブルにデータを追加
    :param cur_obj: Cursorオブジェクト
    :param query:  SQL文
    :param start_ts: <int> スタートタイム
    :return: データの追加完了した場合 -> True, データの追加がなかった場合 -> False
    """
    time.sleep(0.6)
    ts = (start_ts, start_ts + (60 * 60 * 2))
    # (1)SQL文の実行
    cur_obj.execute(query, ts)
    data = cur_obj.fetchone()
    # 想定事例: 取得するデータがなく空のリストが返ってきた場合 -> Falseを返す
    if data is None:
        return False
    insert_query = "INSERT INTO blowupbbs_crypto.bf_amonth (base_no, id, price, timestamp) VALUE (%s,%s,%s,%s);"
    # (2)bf_amonthテーブルにデータを追加
    try:
        cur_obj.execute(insert_query, data)
        con_obj.commit()
    except Exception as err:
        cur_obj.close()
        con_obj.close()
        print(err)
        exit()
    return True
def aweek(cur_obj, query, start_ts):
    """
    1週間表記用テーブルの作成・更新
    2時間分のデータを30分に1つ計4つを取得する
    ・ループ
    (1) bf_baseテーブルからデータ取得
      想定事例: 取得するデータがなく空のリストが返ってきた場合 -> Falseを返す
    (2) bf_aweekテーブルにデータを追加
    :param cur_obj:  Cursorオブジェクト
    :param query:  SQL文
    :param start_ts: <int> スタートタイム
    :return: True
    """
    for i in range(4):
        time.sleep(0.6)
        ts = (start_ts, start_ts + (60 * 30))
        # (1) bf_baseテーブルからデータ取得
        cur_obj.execute(query, ts)
        data = cur_obj.fetchone()
        # 想定事例: 取得するデータが空のリストで返ってきた場合 -> 次の区間へ進む
        if data is None:
            start_ts += (60 * 30)
            continue
        insert_query = "INSERT INTO blowupbbs_crypto.bf_aweek (base_no, id, price, timestamp) VALUE (%s,%s,%s,%s);"
        # (2) bf_aweekテーブルにデータを追加
        try:
            cur_obj.execute(insert_query, data)
            con_obj.commit()
        except Exception as err:
            cur_obj.close()
            con_obj.close()
            print(err)
            exit()
        start_ts += (60 * 30)
    return True
def aday(cur_obj, query, start_ts):
    """
    1日表記用テーブルの作成・更新
    2時間分のデータを5分に1つ計24つを取得する
    ・ループ
    (1) bf_baseテーブルからデータ取得
      想定事例: 取得するデータがなく空のリストが返ってきた場合 -> Falseを返す
    (2) bf_adayテーブルにデータを追加
    :param cur_obj: Cursorオブジェクト
    :param query: SQL文
    :param start_ts: <int> スタートタイム
    :return: Ture
    """
    for i in range(24):
        time.sleep(0.6)
        ts = (start_ts, start_ts + (60 * 5))
        # (1) bf_baseテーブルからデータ取得
        cur_obj.execute(query, ts)
        data = cur_obj.fetchone()
        # 想定事例: 取得するデータがなく空のリストが返ってきた場合 -> Falseを返す
        if data is None:
            start_ts += (60 * 5)
            continue
        insert_query = "INSERT INTO blowupbbs_crypto.bf_aday (base_no, id, price, timestamp) VALUE (%s,%s,%s,%s);"
        # (2) bf_adayテーブルにデータを追加
        try:
            cur_obj.execute(insert_query, data)
            con_obj.commit()
        except Exception as err:
            cur_obj.close()
            con_obj.close()
            print(err)
            exit()
        start_ts += (60 * 5)
    return True
# 実行プログラム
"""
ループを回して各期間用テーブルを作成
スタートタイム = 整理を開始するタイムスタンプ
(1)スタートタイムと現在時刻に2時間以上の隔たりがある
  この2時間内でデータが存在しない -> 次の2時間に進む
                    存在する -> amonthとaweekメソッド実行
(2)スタートタイム+30分 < 現在時刻 < スタートタイム+2時間
  対象期間にデータがない -> 振り分けプログラム終了
                 ある -> adayメソッドを実行し振り分けプログラム終了
(3)スタートタイム+5分 < 現在時刻 < スタートタイム+30分
  adayメソッドを実行し振り分けプログラム終了
(4)現在時刻 < スタートタイム+5分
  振り分けプログラム終了
 
"""
if __name__ == "__main__":
    con_obj, cur_obj = get_db(config)
    while True:
        time.sleep(0.6)
        current_time = time.time()
        # (1) 各期間用テーブルへのデータ振り分けが不十分な場合
        if (start_ts + sec2h) <= current_time:
            v = amonth(cur_obj, select_query, start_ts)
            if not v:
                start_ts += sec2h
                continue
            aweek(cur_obj, select_query, start_ts)
            aday(cur_obj, select_query, start_ts)
        # (2)スタートタイム+30分 < 現在時刻 < スタートタイム+2時間
        elif (start_ts + sec30m) <= current_time < (start_ts + sec2h):
            v = aweek(cur_obj, select_query, start_ts)
            if not v:
                break
            aday(cur_obj, select_query, start_ts)
            break
        # (3)スタートタイム+5分 < 現在時刻 < スタートタイム+30分
        elif (start_ts + sec5m) <= current_time < (start_ts + sec30m):
            aday(cur_obj, select_query, start_ts)
            break
        # (4)現在時刻 < スタートタイム+5分
        else:
            break
        start_ts += sec2h
    cur_obj.close()
    con_obj.close()

1-2-2. aday.py

「aday.py」の記載内容は以下になります。

"""
ワンデイ表記用のテーブルを作成するための実行ファイル
過去データはsort_to_data.pyで仕分け、リアルタイムはこのaday.pyで仕分け
※ cronで5分ごとに実行する
(1) 設定の読み込み
(2) 現在時刻とbf_adayテーブルの最終データ時刻を比較
    5分以上 -> sort_to_table.pyが稼働中であり、このプログラムの実行機会でないため、実行終了
    5分未満 -> bf_baseテーブルからデータを1つ取得し、bf_adayテーブルに追加
"""
import configparser
import os
import time
from db import get_db
# (1)設定の読み込み
config_file = os.path.join(os.path.abspath('.'), "config.ini")
cfg = configparser.ConfigParser()
cfg.read(config_file)
config = {
    "user": cfg["DBserverT"]["USER"],
    "host": cfg["DBserverT"]["HOST"],
    "password": cfg["DBserverT"]["PASSWORD"]
}
confirm_query = "SELECT timestamp FROM blowupbbs_crypto.bf_aday ORDER BY no DESC LIMIT 1;"
select_query = "SELECT * FROM blowupbbs_crypto.bf_base ORDER BY no DESC limit 1;"
insert_query = "INSERT INTO blowupbbs_crypto.bf_aday (base_no, id, price, timestamp) VALUE (%s,%s,%s,%s);"
current_time = time.time()
con_obj, cur_obj = get_db(config)
cur_obj.execute(confirm_query)
last_time = cur_obj.fetchone()[0]
# (2) 現在時刻とbf_adayの最終データ時刻を比較
if (current_time - last_time) > (60*5):
    pass
else:
    current_time = (current_time,)
    cur_obj.execute(select_query)
    data = cur_obj.fetchone()
    cur_obj.execute(insert_query, data)
    con_obj.commit()
cur_obj.close()
con_obj.close()

1-2-3. aweek.py

「aweek.py」の記載内容は以下になります。

"""
ワンウィーク表記用のテーブルを作成するための実行ファイル
過去データはsort_to_data.pyで仕分け、リアルタイムはこのaweek.pyで仕分け
※ cronで5分ごとに実行する
(1) 設定の読み込み
(2) 現在時刻とbf_aweekテーブルの最終データ時刻を比較
    5分以上 -> sort_to_table.pyが稼働中であり、このプログラムの実行機会でないため、実行終了
    5分未満 -> bf_baseテーブルからデータを1つ取得し、bf_aweekテーブルに追加
"""
import configparser
import os
import time
from db import get_db
# (1) 設定の読み込み
config_file = os.path.join(os.path.abspath('.'), "config.ini")
cfg = configparser.ConfigParser()
cfg .read(config_file)
config = {
    "user": cfg["DBserverT"]["USER"],
    "host": cfg["DBserverT"]["HOST"],
    "password": cfg["DBserverT"]["PASSWORD"]
}
confirm_query = "SELECT timestamp FROM blowupbbs_crypto.bf_aweek ORDER BY no DESC LIMIT 1;"
select_query = "SELECT * FROM blowupbbs_crypto.bf_base ORDER BY no DESC limit 1;"
insert_query = "INSERT INTO blowupbbs_crypto.bf_aweek (base_no, id, price, timestamp) VALUE (%s,%s,%s,%s);"
current_time = time.time()
con_obj, cur_obj = get_db(config)
cur_obj.execute(confirm_query)
last_time = cur_obj.fetchone()[0]
# (2) 現在時刻とbf_aweekテーブルの最終データ時刻を比較
if (current_time - last_time) > (60*30):
    pass
else:
    current_time = (current_time,)
    cur_obj.execute(select_query)
    data = cur_obj.fetchone()
    cur_obj.execute(insert_query, data)
    con_obj.commit()
cur_obj.close()
con_obj.close()

1-2-4. amonth.py

「amonth.py」の記載内容です。

"""
ワンマンス表記用のテーブルを作成するための実行ファイル
過去データはsort_to_data.pyで仕分け、リアルタイムはこのamonth.pyで仕分け
※ cronで5分ごとに実行する
(1) 設定の読み込み
(2) 現在時刻とbf_amonthテーブルの最終データ時刻を比較
    5分以上 -> sort_to_table.pyが稼働中であり、このプログラムの実行機会でないため、実行終了
    5分未満 -> bf_baseテーブルからデータを1つ取得し、bf_amonthテーブルに追加
"""
"""
1ヶ月表記用のテーブルを作成するための実行ファイル。
cronで2時間分ごとに実行する
DB接続設定: DBserverE
"""
import configparser
import os
import time
from db import get_db
# (1) 設定の読み込み
config_file = os.path.join(os.path.abspath('.'), "config.ini")
cfg = configparser.ConfigParser()
cfg.read(config_file)
config = {
    "user": cfg["DBserverT"]["USER"],
    "host": cfg["DBserverT"]["HOST"],
    "password": cfg["DBserverT"]["PASSWORD"]
}
confirm_query = "SELECT timestamp FROM blowupbbs_crypto.bf_amonth ORDER BY no DESC LIMIT 1;"
select_query = "SELECT * FROM blowupbbs_crypto.bf_base  ORDER BY no DESC limit 1;"
insert_query = "INSERT INTO blowupbbs_crypto.bf_amonth (base_no, id, price, timestamp) VALUE (%s,%s,%s,%s);"
current_time = time.time()
con_obj, cur_obj = get_db(config)
cur_obj.execute(confirm_query)
last_time = cur_obj.fetchone()[0]
# (2) 現在時刻とbf_amonthテーブルの最終データ時刻を比較
if (current_time - last_time) > (60*60*2):
    pass
else:
    current_time = (current_time,)
    cur_obj.execute(select_query)
    data = cur_obj.fetchone()
    cur_obj.execute(insert_query, data)
    con_obj.commit()
cur_obj.close()
con_obj.close()

1-2-5. config.ini

「config.ini」の記載事項です。実行環境では、make_BaseTable.pyと共有しています。

[SORT_CONFIG]
START_TIME = 1545607000.483
[DBserverT]
USER = <db_user>
HOST = <db_host>
PASSWORD = <db_password>

2. まとめ

以上がそれぞれの期間に対応したテーブルを作成するプログラムです。このチャートアプリを作成する上で自分で考え出した大きな特徴の一つであり、細部など至らない点はまだ多いですが、良い実装の一つではないかと考える部分です。次回はPythonのWEBアプリケーションフレームワークのDashを利用したチャート表示部分のプログラムについてです。