リレーショナルデータベースのテーブルでリストを扱う例(SQLite 3、Python を使用)
【概要】リレーショナルデータベースのテーブルにおいて、リスト L = (e1, e2, ..., en) を {(1, e1), (2, e2), ..., (n, en)} の形式でマッピングする手法を解説する。リレーションスキーマは R(要素番号、要素値) となり、複数のリスト {L1, L2, ..., Lk} をマッピングする場合は R(リスト番号、要素番号、要素値) を採用する。SQLite 3 と Python を用いた実装例と性能評価を示す。
【目次】
前準備
SQLite 3 の詳細説明は 別ページ » を参照すること。
テストデータ生成用 Python プログラム
CSV 形式のテストデータを生成し、標準出力へ出力する Python プログラムである。このプログラムは、指定された数のリストと各リストの要素数に基づいて、テストデータを自動生成する。
生成されるデータの各行は、以下の属性を持つ。
- id: 一意の識別子
- list_num: リスト番号
- item_num: リスト内の要素番号
- x: ランダムな実数値(0.0 から 100.0 の範囲)
- y: ランダムな実数値(0.0 から 100.0 の範囲)
- price: ランダムな整数値(1 から 1000 の範囲)
- name: ランダムな文字列(長さ 8 文字)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# usage: python hoge.py 10 5
import sys
import random
import string
LEN = 8
c = 0
TOTAL_LIST_NUM = int(sys.argv[1])
LIST_LEN = int(sys.argv[2])
print("# id, list_num, item_num, x, y, price, name")
for i in range(1, TOTAL_LIST_NUM + 1):
for j in range(1, LIST_LEN + 1):
id = c
list_num = i
item_num = j
x = 100 * random.random()
y = 100 * random.random()
price = int(1000 * random.random()) + 1
name = ''.join(random.choices(string.ascii_letters + string.digits, k=LEN))
print(f"{id}, {list_num}, {item_num}, {x:.6f}, {y:.6f}, {price}, {name}")
c += 1
テストデータベースの生成手順
上記の Python プログラムを hoge.py として保存し、以下の手順でテストデータベースを生成する。この例では、10 個のリストを生成し、各リストは 5 個の要素を持つ。
生成されるデータベースの仕様は以下のとおりである。
- データベースファイル名: /tmp/1.db
- テーブル名: dat
- リスト数: 10
- 各リストの要素数: 5
#!/bin/bash
rm -f /tmp/1.csv
python /tmp/hoge.py 10 5 | fgrep -v '#' > /tmp/1.csv
rm -f /tmp/1.$$.sql
cat >/tmp/1.$$.sql << SQL
create table dat (
id integer primary key not null,
list_num integer,
item_num integer,
x real,
y real,
price integer,
name text );
.mode csv
.import /tmp/1.csv dat
.exit
SQL
rm -f /tmp/1.db
cat /tmp/1.$$.sql | sqlite3 /tmp/1.db
データベースの内容を確認するには、以下のコマンドを実行する。
sqliteman /tmp/1.db
エッジリストの生成プロセス
ノードリストの集合 {L1, L2, ..., Lk} において、各集合 Li は一連のノードで構成される。これらのノードリスト集合をリレーショナルデータベースのテーブル dat にマッピングし、自己結合操作を実行する。この結果、各行が 1 つのエッジ(2 つのノードを接続する辺)を表現するテーブルが生成される。
エッジリストの生成には、以下の SQL 文を使用する。この SQL 文は、同一リスト内で連続する要素のペアを抽出する。WHERE 句の条件 A.item_num + 1 = B.item_num AND A.list_num = B.list_num により、リスト内で隣接する要素のみが結合される。
select A.id, A.list_num, A.item_num, B.id, B.list_num, B.item_num from dat A, dat B where A.item_num + 1 = B.item_num AND A.list_num = B.list_num;
大規模テストデータベース生成と性能評価
エッジリスト生成システムの信頼性と性能を検証するため、大規模なテストデータベースを用いた性能評価を実施する。評価では、リスト数を 500000、1000000、2000000、5000000 と変化させ、各リストの要素数は 5 で固定する。
評価に使用するデータベースの仕様は以下のとおりである。
- データベースファイル名: /tmp/500000.db、/tmp/1000000.db、/tmp/2000000.db、/tmp/5000000.db
- テーブル名: dat
- リスト数: 500000、1000000、2000000、5000000
- 各リストの要素数: 5
以下の Python スクリプトは、テストデータベースの作成、データの投入、性能評価を自動的に実行する。
#!/usr/bin/env python3
import os
import sqlite3
import subprocess
import sys
import time
def create_database(db_name):
sql = '''
create table dat (
id integer primary key not null,
list_num integer,
item_num integer,
x number,
y number,
price integer,
name text
);
'''
conn = sqlite3.connect(f'/tmp/{db_name}.db')
conn.execute(sql)
conn.close()
def populate_database(db_name, list_num, list_len):
# Generate CSV using Python script
csv_file = f'/tmp/{db_name}.csv'
subprocess.run([
'python', '/tmp/hoge.py', str(list_num), str(list_len)
], stdout=open(csv_file, 'w'))
# Import CSV to SQLite
conn = sqlite3.connect(f'/tmp/{db_name}.db')
with open(csv_file) as f:
next(f) # Skip header
conn.executemany(
'INSERT INTO dat VALUES (?,?,?,?,?,?,?)',
[line.strip().split(',') for line in f]
)
conn.commit()
conn.close()
def run_performance_test(db_name):
print(f'Testing {db_name}.db')
start_time = time.time()
conn = sqlite3.connect(f'/tmp/{db_name}.db')
conn.execute('DROP TABLE IF EXISTS T')
conn.execute('''
CREATE TABLE T AS
SELECT A.id, A.list_num, A.item_num, B.id, B.list_num, B.item_num
FROM dat A, dat B
WHERE A.item_num + 1 = B.item_num AND A.list_num = B.list_num
''')
count = conn.execute('SELECT COUNT(*) FROM T').fetchone()[0]
conn.close()
end_time = time.time()
print(f'Count: {count}')
print(f'Time taken: {end_time - start_time:.2f} seconds\n')
def main():
test_sizes = [500000, 1000000, 2000000, 5000000]
# Create and populate databases
for size in test_sizes:
create_database(str(size))
populate_database(str(size), size, 5)
# Run performance tests
for size in test_sizes:
run_performance_test(str(size))
if __name__ == '__main__':
main()
二次索引の適用と性能評価
システム性能を最適化するため、二次索引を適用し、その効果を評価する。二次索引は item_num と list_num の組み合わせに対して作成される。これにより、エッジリスト生成時の検索性能が向上することが期待される。
以下の Python スクリプトは、二次索引を作成し、性能評価を実施する。
#!/usr/bin/env python3
import sqlite3
import time
def create_index_and_test(db_name):
print(f'Testing {db_name}.db with index')
# Create index
conn = sqlite3.connect(f'/tmp/{db_name}.db')
conn.execute('CREATE INDEX idx001 ON dat(item_num, list_num)')
conn.close()
# Run performance test
start_time = time.time()
conn = sqlite3.connect(f'/tmp/{db_name}.db')
conn.execute('DROP TABLE IF EXISTS T')
conn.execute('''
CREATE TABLE T AS
SELECT A.id, A.list_num, A.item_num, B.id, B.list_num, B.item_num
FROM dat A, dat B
WHERE A.item_num + 1 = B.item_num AND A.list_num = B.list_num
''')
count = conn.execute('SELECT COUNT(*) FROM T').fetchone()[0]
conn.close()
end_time = time.time()
print(f'Count: {count}')
print(f'Time taken: {end_time - start_time:.2f} seconds\n')
def main():
test_sizes = ['500000', '1000000', '2000000', '5000000']
for size in test_sizes:
create_index_and_test(size)
if __name__ == '__main__':
main()