首页 >后端开发 >Python教程 >我更新了 BoardGameGeek 数据的 Python 获取器

我更新了 BoardGameGeek 数据的 Python 获取器

Patricia Arquette
Patricia Arquette原创
2024-10-15 22:15:30357浏览

I

此脚本将从 BoardGameGeek API 获取项目数据并将数据存储在 CSV 文件中。

我更新了之前的脚本。由于 API 响应采用 XML 格式,并且没有端点可以一次获取所有项目,因此前面的脚本将循环遍历提供的 ID 范围,对每个项目进行逐一调用。这不是最优的,对于更大范围的 ID 需要很长时间(目前 BGG 上可用的最高项目(ID)数量高达 400k ),并且结果可能不可靠。因此,通过对此脚本的一些修改,更多的项目ID将作为参数值添加到单个请求url中,这样,单个响应将返回多个项目(〜800是单个响应返回的最高数量。BGG稍后可能会更改它;您可以轻松调整batch_size以便根据需要进行调整)。

此外,此脚本将获取所有项目,而不仅仅是与棋盘游戏相关的数据。

为每个棋盘游戏获取和存储的信息如下:

名称、游戏 ID、类型、评级、权重、发布年份、最小玩家数、最大玩家数、最短游戏时间、最大支付时间、最小年龄、所属者、类别、机制、设计师、艺术家和发行商。

该脚本的更新如下;我们首先导入此脚本所需的库:

# Import libraries
from bs4 import BeautifulSoup
from csv import DictWriter
import pandas as pd
import requests
import time

以下是脚本完成时根据 ID 范围调用的函数。此外,如果发出请求时出现错误,将调用此函数以存储截至异常发生时附加到游戏列表的所有数据。

# CSV file saving function
def save_to_csv(games):
    csv_header = [
        'name', 'game_id', 'type', 'rating', 'weight', 'year_published', 'min_players', 'max_players',
        'min_play_time', 'max_play_time', 'min_age', 'owned_by', 'categories',
        'mechanics', 'designers', 'artists', 'publishers'
    ]
    with open('bgg.csv', 'a', encoding='UTF8') as f:
        dictwriter_object = DictWriter(f, fieldnames=csv_header)
        if f.tell() == 0:
            dictwriter_object.writeheader()
        dictwriter_object.writerows(games)

我们需要定义请求的标头。请求之间的暂停可以通过 SLEEP_BETWEEN_REQUESTS 设置(我看到一些信息说速率限制是每秒 2 个请求,但它可能是过时的信息,因为我将暂停设置为 0 没有遇到问题)。另外,这里设置起始点ID(start_id_range)、最大范围(max_id_range)和batch_size的值,batch_size是响应应该返回的游戏数量。基本 url 在本节中定义,但 ID 在脚本的下一部分中添加。

# Define request url headers
headers = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.16; rv:85.0) Gecko/20100101 Firefox/85.0",
    "Accept-Language": "en-GB, en-US, q=0.9, en"
}

# Define sleep timer value between requests
SLEEP_BETWEEN_REQUEST = 0

# Define max id range
start_id_range = 0
max_id_range = 403000
batch_size = 800
base_url = "https://boardgamegeek.com/xmlapi2/thing?id="

下面是这个脚本的主要逻辑。首先,根据batch size,它会生成一个ID字符串,这些ID在定义的ID范围内,但ID的数量不能超过batch_size中定义的数量,并将其附加到url的id参数中。这样,每个响应将返回与批量大小相同的项目数量的数据。之后,它将处理数据并将其附加到每个响应的游戏列表中,最后附加到 CSV 文件中。

# Main loop that will iterate between the starting and maximum range in intervals of the batch size
for batch_start in range(start_id_range, max_id_range, batch_size):
    # Make sure that the batch size will not exceed the maximum ids range
    batch_end = min(batch_start + batch_size - 1, max_id_range)
    # Join and append to the url the IDs within batch size
    ids = ",".join(map(str, range(batch_start, batch_end + 1)))
    url = f"{base_url}?id={ids}&stats=1"

    # If by any chance there is an error, this will throw the exception and continue on the next batch
    try:
        response = requests.get(url, headers=headers)
    except Exception as err:
        print(err)
        continue


    if response.status_code == 200:
        soup = BeautifulSoup(response.text, features="html.parser")
        items = soup.find_all("item")
        games = []
        for item in items:
            if item:
                try:
                    # Find values in the XML
                    name = item.find("name")['value'] if item.find("name") is not None else 0
                    year_published = item.find("yearpublished")['value'] if item.find("yearpublished") is not None else 0
                    min_players = item.find("minplayers")['value'] if item.find("minplayers") is not None else 0
                    max_players = item.find("maxplayers")['value'] if item.find("maxplayers") is not None else 0
                    min_play_time = item.find("minplaytime")['value'] if item.find("minplaytime") is not None else 0
                    max_play_time = item.find("maxplaytime")['value'] if item.find("maxplaytime") is not None else 0
                    min_age = item.find("minage")['value'] if item.find("minage") is not None else 0
                    rating = item.find("average")['value'] if item.find("average") is not None else 0
                    weight = item.find("averageweight")['value'] if item.find("averageweight") is not None else 0
                    owned = item.find("owned")['value'] if item.find("owned") is not None else 0


                    link_type = {'categories': [], 'mechanics': [], 'designers': [], 'artists': [], 'publishers': []}

                    links = item.find_all("link")

                    # Append value(s) for each link type
                    for link in links:                            
                        if link['type'] == "boardgamecategory":
                            link_type['categories'].append(link['value'])
                        if link['type'] == "boardgamemechanic":
                            link_type['mechanics'].append(link['value'])
                        if link['type'] == "boardgamedesigner":
                            link_type['designers'].append(link['value'])
                        if link['type'] == "boardgameartist":
                            link_type['artists'].append(link['value'])
                        if link['type'] == "boardgamepublisher":
                            link_type['publishers'].append(link['value'])

                    # Append 0 if there is no value for any link type
                    for key, ltype in link_type.items():
                        if not ltype:
                            ltype.append("0")

                    game = {
                        "name": name,
                        "game_id": item['id'],
                        "type": item['type'],
                        "rating": rating,
                        "weight": weight,
                        "year_published": year_published,
                        "min_players": min_players,
                        "max_players": max_players,
                        "min_play_time": min_play_time,
                        "max_play_time": max_play_time,
                        "min_age": min_age,
                        "owned_by": owned,
                        "categories": ', '.join(link_type['categories']),
                        "mechanics": ', '.join(link_type['mechanics']),
                        "designers": ', '.join(link_type['designers']),
                        "artists": ', '.join(link_type['artists']),
                        "publishers": ', '.join(link_type['publishers']),
                    }

                    # Append current item to games list
                    games.append(game)
                except TypeError:
                    print(">>> NoneType error. Continued on the next item.")
                    continue
        save_to_csv(games)

        print(f">>> Request successful for batch {batch_start}-{batch_end}")
    else:
        print(f">>> FAILED batch {batch_start}-{batch_end}")

    # Pause between requests
    time.sleep(SLEEP_BETWEEN_REQUEST)

下面您可以以 pandas DataFrame 的形式预览 CSV 文件中的前几行记录。

# Preview the CSV as pandas DataFrame
df = pd.read_csv('./bgg.csv')
print(df.head(5))

以上是我更新了 BoardGameGeek 数据的 Python 获取器的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn