開発情報・ナレッジ

投稿者: ShiningStar株式会社 2024年8月22日 (木)

GoogleColab上でSPIRALのDBのファイルをGoogleドライブにアップロードしてみた

GoogleColab を使ってSPIRALのDBにアップロードされているファイルをGoogleDriveにアップロードするプログラムを作成してみました。
GoogleColabとはGoogleの環境上でPythonコードを実行出来るサービスとなります。
GoogleColabを利用することで他のGoogleサービスとの連携が簡単にできるので、お試しください。


全体像

GoogleDriveをマウントし操作可能にした上で、
SPIRALのAPIを使用してファイルをダウンロードして一時ファイルに保存し、
そのファイルをGoogleドライブへコピーしてアップロードします。

DB設定

任意のDBを作成し、ファイル型フィールドを追加してください。
この際ファイルフィールドのフィールド識別名を控えておいてください。

サンプルプログラム

import requests
import os
import shutil
import time
import base64
import json
from google.colab import drive

# Googleドライブをマウント
drive.mount('/content/drive')

# 設定値
LOCATOR_URL = "https://www.pi-pe.co.jp/api/locator"
API_TOKEN = ""  # SPIRALのAPIトークン
API_SECRET = ""  # SPIRALのシークレット
db_title = "fileTest"  # データベースのタイトル
file_field_title = "file"  # ファイル型フィールドのタイトル
select_columns = ["id"]  # レコードのIDを取得するフィールド(変更不要)
delete_flg = 1  # ファイル削除フラグ(1: 削除する、0: 削除しない)
google_drive_folder_path = "/content/drive/My Drive/fromspiral/"  # アップロード先フォルダパス

def get_api_url():
    headers = {
        "X-SPIRAL-API": "locator/apiserver/request",
        "Content-Type": "application/json; charset=UTF-8"
    }
    data = {
        "spiral_api_token": API_TOKEN
    }
    response = requests.post(LOCATOR_URL, headers=headers, json=data)
    if response.status_code != 200:
        raise Exception(f"Locator API call failed: {response.text}")
    return response.json()['location']

def generate_signature(token, passkey, secret):
    import hashlib
    import hmac
    message = f"{token}&{passkey}"
    signature = hmac.new(secret.encode('utf-8'), message.encode('utf-8'), hashlib.sha1).hexdigest()
    return signature

def get_records(api_url):
    headers = {
        "X-SPIRAL-API": "database/select/request",
        "Content-Type": "application/json; charset=UTF-8"
    }
    passkey = int(time.time())

    data = {
        "spiral_api_token": API_TOKEN,
        "db_title": db_title,
        "passkey": passkey,
        "signature": generate_signature(API_TOKEN, passkey, API_SECRET),
        "select_columns": select_columns,
        "lines_per_page": 1000,
        "page": 1
    }

    all_records = []
    while True:
        response = requests.post(api_url, headers=headers, json=data)
        if response.status_code != 200:
            raise Exception(f"Database select API call failed: {response.text}")

        response_json = response.json()
        if response_json['code'] != '0':
            raise Exception(f"API Error: {response_json['message']}")

        header = response_json['header']
        data_list = response_json['data']

        for row in data_list:
            record = {header[i]: row[i] for i in range(len(header))}
            all_records.append(record)

        if len(data_list) < data['lines_per_page']:
            break
        data["page"] += 1

    return all_records

def get_file(api_url, record_id):
    headers = {
        "X-SPIRAL-API": "database/get_file/request",
        "Content-Type": "application/json; charset=UTF-8"
    }
    passkey = int(time.time())
    data = {
        "spiral_api_token": API_TOKEN,
        "passkey": passkey,
        "db_title": db_title,
        "file_field_title": file_field_title,
        "key_field_title": "id",
        "key_field_value": record_id,
        "signature": generate_signature(API_TOKEN, passkey, API_SECRET)
    }
    response = requests.post(api_url, headers=headers, json=data)
    if response.status_code != 200:
        raise Exception(f"Database get_file API call failed: {response.text}")

    response_json = response.json()
    file_name = response_json.get('file_name')
    file_data = base64.b64decode(response_json.get('data')) if response_json.get('data') else None

    return file_data, file_name

def delete_file(api_url, record_id):
    passkey = int(time.time())
    signature = generate_signature(API_TOKEN, passkey, API_SECRET)

    # JSONデータ部分
    json_part = {
        "spiral_api_token": API_TOKEN,
        "passkey": passkey,
        "signature": signature,
        "db_title": db_title,
        "search_condition": [{"name": "id", "value": record_id, "operator": "="}],
        "data": [{"name": file_field_title, "value": ""}]
    }

    json_data = json.dumps(json_part)
    boundary = "----------" + str(time.time()).replace('.', '')

    body = f"--{boundary}\r\n"
    body += "Content-Type: application/json; charset=UTF-8\r\n"
    body += "Content-Disposition: form-data; name=\"json\"\r\n\r\n"
    body += json_data + "\r\n"
    body += f"--{boundary}--\r\n"

    headers = {
        "X-SPIRAL-API": "database/update/request",
        "Content-Type": f"multipart/form-data; boundary={boundary}"
    }

    response = requests.post(api_url, headers=headers, data=body)
    if response.status_code != 200:
        raise Exception(f"Database update (delete file) API call failed: {response.text}")
    print(f"レコード '{record_id}' のファイルが削除されました。")

# メイン処理
try:
    api_url = get_api_url()
    records = get_records(api_url)
    #print(records) レコードが正しく取得できているか確認

    for record in records:
        record_id = record["id"]
        file_data, file_name = get_file(api_url, record_id)
        #print(file_name) レコードが正しく取得できているか確認
        #print(file_data) レコードが正しく取得できているか確認
        record_id = record["id"]
        file_data, file_name = get_file(api_url, record_id)

        if file_data and file_name:
            local_file_path = os.path.join('/tmp', file_name)
            with open(local_file_path, 'wb') as file:
                file.write(file_data)

            for attempt in range(3):
                try:
                    drive_file_path = os.path.join(google_drive_folder_path, file_name)
                    shutil.copy2(local_file_path, drive_file_path)
                    print(f"ファイル '{file_name}' のGoogle Driveへのアップロードが成功しました。")
                    os.remove(local_file_path)
                    break
                except Exception as e:
                    print(f"アップロード中にエラーが発生しました: {e}")
                    time.sleep(5)
            else:
                print(f"ファイル '{file_name}' のGoogle Driveへのアップロードに失敗しました。")
            
            #アップロード完了後の処理を追加したい場合は下記へ追加してください。
            if delete_flg == 1:
                delete_file(api_url, record_id)

except Exception as e:
    print(f"処理中にエラーが発生しました: {e}")
 
            

GoogleDriveのパスの確認方法

データをアップロードする時にアップロードしたいファイルのパスを確認する必要があります。

from google.colab import drive

# Google Driveをマウント
drive.mount('/content/drive') 
            

こちらのGoogleDriveをマウントする記述のみをコードブロックに貼り付けて実行する事で、
サイドバーのファイル欄からディレクトリを確認することができます。

設定方法

GoogleColabへアクセスして新規ノートブックを作成してください。
作成したら上記サンプルプログラムをそのまま貼り付けてください。
GoogleDriveのパスがご不明な方は上記のGoogleDriveのパスの確認方法に従ってパスを確認してください。
後に、

# 設定値
LOCATOR_URL = "https://www.pi-pe.co.jp/api/locator"
API_TOKEN = ""  # SPIRALのAPIトークン
API_SECRET = ""  # SPIRALのシークレット
db_title = "fileTest"  # データベースのタイトル
file_field_title = "file"  # ファイル型フィールドのタイトル
select_columns = ["id"]  # レコードのIDを取得するフィールド(変更不要)
delete_flg = 1  # レコード削除フラグ(1: 削除する、0: 削除しない)
google_drive_folder_path = "/content/drive/My Drive/fromspiral/"  # アップロード先フォルダパス

こちらをお使いの環境に合わせて修正してください。
その他は変更せずに稼働いたします。

修正が終わったら、
左上にあります再生ボタンの様なボタンを押すとプログラムが実行され、
SPIRALのファイルがGoogleドライブへアップロードされます。
本サンプルプログラムではGoogleドライブへアップロードが完了したファイルを削除する機能も実装してあります。
削除したい場合はdelete_flgを1、削除したくない場合は0を指定して実行してください。

ファイル削除以外の処理をアップロード完了時に行いたい場合は、
「#アップロード完了後の処理を追加したい場合は下記へ追加してください。」
以下に条件式等含めて記載してください。

うまく動作しない場合

Googleドライブ側でアップロードができているか確認する際に、
1分ほどラグがある可能性がありますので確認する際は少し時間を置いて確認してください。

SPIRALやGoogleドライブのAPIを実行した結果をprintする様にコメントアウトで記載しています。
SPIRALのデータがダウンロード完了しているか、Googleドライブにアップロード成功しているか、ファイルの削除の更新APIがうまく動作しているか、
各種レスポンスを表示して確認してください。

その他

GoogleColab上では他のGoogleサービスとの連携がかなり容易でAPI認証等必要ないので、
例えば他には容易にGmailのデータをSPIRALに格納することが出来ます。
GoogleサービスとSPIRALの連携には非常に良い環境なので、
GoogleサービスとSPIRALのデータ連携をする場合は是非GoogleColabの利用をご検討頂けると幸いです。
また、外部環境との連携ですのでデータの取扱には十分にご注意ください。

解決しない場合はこちら コンテンツに関しての
要望はこちら