61 lines
1.6 KiB
Python
61 lines
1.6 KiB
Python
import user_agents
|
|
import random
|
|
import requests
|
|
import re
|
|
import os
|
|
import json
|
|
from concurrent import futures
|
|
|
|
|
|
def player_archives(username):
|
|
user_agent = random.choice(user_agents.user_agents)
|
|
headers = {'User-Agent': user_agent}
|
|
url = f"https://api.chess.com/pub/player/{username}/games/archives"
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
archive = response.json()['archives']
|
|
|
|
try:
|
|
cwd = os.getcwd()
|
|
path = os.path.join(cwd, 'archives')
|
|
os.makedirs(path)
|
|
except OSError:
|
|
pass
|
|
|
|
file_name = os.path.join(path, username + '_archive.txt')
|
|
with open(file_name, 'w') as f:
|
|
json.dump(archive, f)
|
|
|
|
return archive
|
|
|
|
|
|
def player_monthly(url=None):
|
|
user_agent = random.choice(user_agents.user_agents)
|
|
headers = {'User-Agent': user_agent}
|
|
|
|
if url:
|
|
url += '/pgn'
|
|
else:
|
|
username = input("username: ")
|
|
year = int(input("year in YYYY: "))
|
|
month = int(input("month in MM: "))
|
|
url = f"https://api.chess.com/pub/player/{username}/games/{year}/{month:02d}/pgn"
|
|
|
|
response = requests.get(url, headers=headers)
|
|
response.raise_for_status()
|
|
content = response.content.decode('utf-8')
|
|
|
|
return content
|
|
|
|
|
|
def threaded_games(username=None, archive=None):
|
|
if not archive:
|
|
archive = player_archives(username or input("username: "))
|
|
|
|
with futures.ThreadPoolExecutor() as executor:
|
|
results = executor.map(player_monthly, archive)
|
|
|
|
pgn_db = '\n\n'.join([r for r in results])
|
|
with open(username + '.pgn', 'w') as f:
|
|
return pgn_db
|