【PythonコピペでOK】BlueskyのAPIから投稿を取得して削除するPythonコード

※本記事はコードが読める人向けの記事です。

 タイトルの通り『BlueskyのAPIから投稿を取得して削除するPythonコード』です。

 自己責任でつかってください。

  • 11、12行目:BlueSkyのAPI情報
  • 65行目:取得する投稿数
  • 78行目:削除する間隔

 12行目のAPIパスワードはアカウントの設定から作れます。作り方は他所で調べてください。
 65行目と78行目の値をいじると、一度の実行で削除する投稿数と削除のペースを変えられますが、取得数を多くしすぎたり削除のペースを早めるとリミットレート(利用制限)にひっかかるので、増やす・早める場合は少しずつ試してみてください。

import requests
import json
import time

# グローバル変数としてセッションデータを保持
session_data = None

def create_session():
    url = 'https://bsky.social/xrpc/com.atproto.server.createSession'
    payload = {
        "identifier": "XXXXXXXX.bsky.social",  # Blueskyのユーザー識別子
        "password": "xxxx-xxxx-xxxx-xxxx"       # BlueskyのAPIパスワード
    }
    headers = {
        'Content-Type': 'application/json; charset=UTF-8',
    }

    for attempt in range(5):  # 最大5回リトライ
        response = requests.post(url, headers=headers, data=json.dumps(payload))
        if response.status_code == 429:  # Too Many Requests
            print("Rate limit exceeded, waiting...")
            time.sleep(2 ** attempt)  # 待機時間を指数的に増加
            continue
        response.raise_for_status()  # その他のエラーを確認
        return response.json()  # セッション情報を返す

    raise Exception("Failed to create session after retries.")

def get_author_feed(limit=50):
    global session_data
    if session_data is None:  # セッションがない場合のみ作成
        session_data = create_session()

    url = f"https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor={session_data['handle']}&limit={limit}"
    headers = {
        'Authorization': f"Bearer {session_data['accessJwt']}"
    }

    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()  # フィードデータを返す

def delete_record_by_uri(uri):
    global session_data
    if session_data is None:  # セッションがない場合は作成
        session_data = create_session()
    
    # URIからrkeyを抽出
    rkey = uri.split('/').pop()
    url = 'https://bsky.social/xrpc/com.atproto.repo.deleteRecord'
    payload = {
        "repo": session_data["handle"],
        "collection": "app.bsky.feed.post",
        "rkey": rkey,
    }
    headers = {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer ' + session_data['accessJwt'],
    }
    
    response = requests.post(url, headers=headers, data=json.dumps(payload))
    return response.status_code

def main():
    post_limit = 100  # 取得したい投稿数を指定
    feed_data = get_author_feed(limit=post_limit)
    print(f"取得した投稿数: {len(feed_data['feed'])}件")

    for i, post in enumerate(feed_data['feed']):
        uri = post['post']['uri']
        delete_status = delete_record_by_uri(uri)

        if delete_status == 200:
            print(f"Post {i + 1} deleted successfully.")
        else:
            print(f"Failed to delete post {i + 1}. Status code: {delete_status}")
        
        time.sleep(0.05)  # 削除する前に0.05秒の間隔をあける

if __name__ == "__main__":
    main()

コメント

タイトルとURLをコピーしました