※本記事はコードが読める人向けの記事です。
タイトルの通り『BlueskyのAPIから投稿を取得して削除するPythonコード』です。
自己責任でつかってください。
- 11、12行目:BlueSkyのAPI情報
- 65行目:取得する投稿数
- 78行目:削除する間隔
12行目のAPIパスワードはアカウントの設定から作れます。作り方は他所で調べてください。
65行目と78行目の値をいじると、一度の実行で削除する投稿数と削除のペースを変えられますが、取得数を多くしすぎたり削除のペースを早めるとリミットレート(利用制限)にひっかかるので、増やす・早める場合は少しずつ試してみてください。
import requests
import json
import time
# グローバル変数としてセッションデータを保持
session_data = None
def create_session():
url = 'https://bsky.social/xrpc/com.atproto.server.createSession'
payload = {
"identifier": "XXXXXXXX.bsky.social", # Blueskyのユーザー識別子
"password": "xxxx-xxxx-xxxx-xxxx" # BlueskyのAPIパスワード
}
headers = {
'Content-Type': 'application/json; charset=UTF-8',
}
for attempt in range(5): # 最大5回リトライ
response = requests.post(url, headers=headers, data=json.dumps(payload))
if response.status_code == 429: # Too Many Requests
print("Rate limit exceeded, waiting...")
time.sleep(2 ** attempt) # 待機時間を指数的に増加
continue
response.raise_for_status() # その他のエラーを確認
return response.json() # セッション情報を返す
raise Exception("Failed to create session after retries.")
def get_author_feed(limit=50):
global session_data
if session_data is None: # セッションがない場合のみ作成
session_data = create_session()
url = f"https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor={session_data['handle']}&limit={limit}"
headers = {
'Authorization': f"Bearer {session_data['accessJwt']}"
}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json() # フィードデータを返す
def delete_record_by_uri(uri):
global session_data
if session_data is None: # セッションがない場合は作成
session_data = create_session()
# URIからrkeyを抽出
rkey = uri.split('/').pop()
url = 'https://bsky.social/xrpc/com.atproto.repo.deleteRecord'
payload = {
"repo": session_data["handle"],
"collection": "app.bsky.feed.post",
"rkey": rkey,
}
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + session_data['accessJwt'],
}
response = requests.post(url, headers=headers, data=json.dumps(payload))
return response.status_code
def main():
post_limit = 100 # 取得したい投稿数を指定
feed_data = get_author_feed(limit=post_limit)
print(f"取得した投稿数: {len(feed_data['feed'])}件")
for i, post in enumerate(feed_data['feed']):
uri = post['post']['uri']
delete_status = delete_record_by_uri(uri)
if delete_status == 200:
print(f"Post {i + 1} deleted successfully.")
else:
print(f"Failed to delete post {i + 1}. Status code: {delete_status}")
time.sleep(0.05) # 削除する前に0.05秒の間隔をあける
if __name__ == "__main__":
main()
コメント