我们将为社交网络 Bluesky 开发一个机器人,我们将使用 Golang,这个机器人将通过 websocket 监控一些主题标签,
如果它找到这些主题标签之一,它将重新发布并喜欢原始帖子。
我们将介绍一些非常酷的东西,例如 websocket、AT(bluesky 使用的协议)、CAR(内容可寻址存档)和 CBOR(简洁二进制对象表示)是用于有效存储和传输数据的两种格式。
该项目将有一个简单的结构,在内部我们将有一个名为 bot 的包,其中包含运行机器人的所有代码,
在 utils 中,我们将有一些功能来帮助我们。
在 .env 文件中,我们将拥有访问 api 的 bluesky 凭据。
要对 bluesky API 进行身份验证,我们需要提供标识符和密码,但我们无法使用密码访问我们的帐户,
为此,我们将创建一个应用程序密码,只需访问您在bluesky中的帐户,访问设置,然后应用程序密码。
使用生成的密码,将其放入 .env 文件中,如下所示:
BLUESKY_IDENTIFIER=<seu_identificador> BLUESKY_PASSWORD=<seu_app_password>
每当我们的机器人识别出我们正在监控的新主题标签时,就会做出回复,但我们需要一个不记名令牌才能进行转发,
我们将创建一个生成令牌的函数,我们将在 get-token.go 文件中执行此操作。
首先我们为 API url 定义一个全局变量。
var ( API_URL = "https://bsky.social/xrpc" )
现在我们使用 API 返回的数据定义结构。
type DIDDoc struct { Context []string `json:"@context"` ID string `json:"id"` AlsoKnownAs []string `json:"alsoKnownAs"` VerificationMethod []struct { ID string `json:"id"` Type string `json:"type"` Controller string `json:"controller"` PublicKeyMultibase string `json:"publicKeyMultibase"` } `json:"verificationMethod"` Service []struct { ID string `json:"id"` Type string `json:"type"` ServiceEndpoint string `json:"serviceEndpoint"` } `json:"service"` } type DIDResponse struct { DID string `json:"did"` DIDDoc DIDDoc `json:"didDoc"` Handle string `json:"handle"` Email string `json:"email"` EmailConfirmed bool `json:"emailConfirmed"` EmailAuthFactor bool `json:"emailAuthFactor"` AccessJwt string `json:"accessJwt"` RefreshJwt string `json:"refreshJwt"` Active bool `json:"active"` }
现在我们将创建返回 DIDResponse 的 getToken 函数(您可以给它任何您想要的名称)。
func getToken() (*DIDResponse, error) { requestBody, err := json.Marshal(map[string]string{ "identifier": os.Getenv("BLUESKY_IDENTIFIER"), "password": os.Getenv("BLUESKY_PASSWORD"), }) if err != nil { return nil, fmt.Errorf("failed to marshal request body: %w", err) } url := fmt.Sprintf("%s/com.atproto.server.createSession", API_URL) resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody)) if err != nil { return nil, fmt.Errorf("failed to send request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) } var tokenResponse DIDResponse if err := json.NewDecoder(resp.Body).Decode(&tokenResponse); err != nil { return nil, fmt.Errorf("failed to decode response: %w", err) } return &tokenResponse, nil }
这个函数调用bluesky端点com.atproto.server.createSession,我们将收到一些数据,但现在重要的是accessJwt,这是我们通过Bearer授权我们的机器人所需要的,用它来生成令牌已准备就绪。
这将是机器人最复杂的功能,我们需要使用 bluesky 端点。
首先,让我们创建一个变量来保存端点,请参阅文档
var ( wsURL = "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" )
现在让我们创建结构:
type RepoCommitEvent struct { Repo string `cbor:"repo"` Rev string `cbor:"rev"` Seq int64 `cbor:"seq"` Since string `cbor:"since"` Time string `cbor:"time"` TooBig bool `cbor:"tooBig"` Prev interface{} `cbor:"prev"` Rebase bool `cbor:"rebase"` Blocks []byte `cbor:"blocks"` Ops []RepoOperation `cbor:"ops"` } type RepoOperation struct { Action string `cbor:"action"` Path string `cbor:"path"` Reply *Reply `cbor:"reply"` Text []byte `cbor:"text"` CID interface{} `cbor:"cid"` } type Reply struct { Parent Parent `json:"parent"` Root Root `json:"root"` } type Parent struct { Cid string `json:"cid"` Uri string `json:"uri"` } type Root struct { Cid string `json:"cid"` Uri string `json:"uri"` } type Post struct { Type string `json:"$type"` Text string `json:"text"` Reply *Reply `json:"reply"` }
我们还将使用 Gorilla Websocket 包,下载该包:
go get github.com/gorilla/websocket
Websocket 函数最初看起来像这样:
func Websocket() error { conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) if err != nil { slog.Error("Failed to connect to WebSocket", "error", err) return err } defer conn.Close() for { _, message, err := conn.ReadMessage() if err != nil { slog.Error("Error reading message from WebSocket", "error", err) continue } } }
有了这个,我们现在可以用无限的 for 读取通过 websocket 接收的消息,但消息是用 CBOR 编码的。
CBOR(简洁二进制对象表示)是一种二进制数据格式,用于以紧凑且高效的方式表示数据。
它与 JSON 类似,但它不使用人类可读的文本,而是使用二进制字节,这使得传输和处理更小、更快。
要解码它,我们需要使用这个包。
decoder := cbor.NewDecoder(bytes.NewReader(message))
只需将消息转为阅读器,如下所示:
func Websocket() error { conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil) if err != nil { slog.Error("Failed to connect to WebSocket", "error", err) return err } defer conn.Close() slog.Info("Connected to WebSocket", "url", wsURL) for { _, message, err := conn.ReadMessage() if err != nil { slog.Error("Error reading message from WebSocket", "error", err) continue } decoder := cbor.NewDecoder(bytes.NewReader(message)) for { var evt RepoCommitEvent err := decoder.Decode(&evt) if err == io.EOF { break } if err != nil { slog.Error("Error decoding CBOR message", "error", err) break } } } }
decoder.Decode(&evt):解码器负责读取接收到的数据,并将其从CBOR格式解码为RepoCommitEvent类型。 evt 存储解码后的数据。
if err == io.EOF {break }:如果解码器到达数据末尾(没有更多消息),则返回 io.EOF(文件结尾)。当这种情况发生时,循环会被break中断,因为没有更多的数据需要处理。
让我们创建一个函数来处理事件:
func handleEvent(evt RepoCommitEvent) error { for _, op := range evt.Ops { if op.Action == "create" { if len(evt.Blocks) > 0 { err := handleCARBlocks(evt.Blocks, op) if err != nil { slog.Error("Error handling CAR blocks", "error", err) return err } } } } return nil }
evt 参数:函数接收一个 evt 参数,该参数是 RepoCommitEvent 类型的事件。此事件包含 Ops 操作的列表以及可能与这些操作相关的 Blocks 数据块。
循环操作:evt 事件可以包含多个操作。代码使用 for _, op := range evt.Ops 循环迭代每个操作。
检查 op.Action == "create" 操作:对于每个操作,代码检查关联的操作是否为 create,即该操作是否正在 bluesky 中创建新内容,例如帖子或其他类型的内容。
如果有Blocks len(evt.Blocks) > 0:如果检测到创建操作,代码检查事件是否包含Blocks数据块。这些块包含可能与操作相关的附加信息。
处理handleCARBlocks块:如果存在块,则调用handleCARBlocks函数来处理这些块。该函数负责解释块内的数据(我们将在下面介绍 CAR)。
CAR (Content Addressable Archive) is an archive format that stores data efficiently and securely using content addressing. This means that each piece of data is identified by its content rather than a specific location.
Here is a simple explanation:
Content Identified by Hash: Each block of data in a CAR file is identified by a hash (a unique identifier generated from the content of the data). This ensures that the same piece of data always has the same identifier.
Used in IPFS and IPLD: CAR is widely used in systems such as IPFS (InterPlanetary File System) and IPLD (InterPlanetary Linked Data), where data is distributed and retrieved over the network based on content rather than location like bluesky.
Data Blocks: A CAR file can store multiple blocks of data, and each block can be retrieved individually using its content identifier (CID).
Efficient and Safe: Since a block's identifier depends on its content, it is easy to verify that the data is correct and has not been altered.
This is a very simple explanation, if you want to go deeper, I recommend accessing this.
This will be the most complex function of the bot:
func handleCARBlocks(blocks []byte, op RepoOperation) error { if len(blocks) == 0 { return errors.New("no blocks to process") } reader, err := carv2.NewBlockReader(bytes.NewReader(blocks)) if err != nil { slog.Error("Error creating CAR block reader", "error", err) return err } for { block, err := reader.Next() if err == io.EOF { break } if err != nil { slog.Error("Error reading CAR block", "error", err) break } if opTag, ok := op.CID.(cbor.Tag); ok { if cidBytes, ok := opTag.Content.([]byte); ok { c, err := decodeCID(cidBytes) if err != nil { slog.Error("Error decoding CID from bytes", "error", err) continue } if block.Cid().Equals(c) { var post Post err := cbor.Unmarshal(block.RawData(), &post) if err != nil { slog.Error("Error decoding CBOR block", "error", err) continue } if post.Text == "" || post.Reply == nil { continue } if utils.FilterTerms(post.Text) { repost(&post) // we will still create } } } } } return nil }
We will still create the repost() function, we will pass a pointer to *Post as a parameter.
Remember that our bot only monitors post comments, if a post is created and the hashtag we are monitoring is inserted, the repost will not be made, this
validation if post.Text == "" || post.Reply == nil will prevent it, it is necessary to have a reply and this only happens if it is a comment on a post.
The handleCARBlocks function processes data blocks in CAR format. Let's understand step by step what the function does in a simple way:
if len(blocks) == 0 { return errors.New("no blocks to process") }
If the blocks are empty, the function returns an error saying that there are no blocks to process.
reader, err := carv2.NewBlockReader(bytes.NewReader(blocks))
The function creates a block reader to interpret the data contained in the CAR file, we are using the packages carV2 and go-cid
To install, run:
go install github.com/ipld/go-car/cmd/car@latest
go get github.com/ipfs/go-cid
for { block, err := reader.Next() if err == io.EOF { break } }
The function enters a loop to read all data blocks one by one. When all blocks are read (i.e. the end is reached), the loop stops.
if opTag, ok := op.CID.(cbor.Tag); ok { if cidBytes, ok := opTag.Content.([]byte); ok { c, err := decodeCID(cidBytes)
The function checks whether the operation contains a CID (Content Identifier) that can be decoded. This CID identifies the specific content of the block.
if block.Cid().Equals(c) { var post Post err := cbor.Unmarshal(block.RawData(), &post)
If the block read has the same CID as the operation, the block content is decoded into a format that the function understands, such as a "Post".
if post.Text == "" || post.Reply == nil { continue } if utils.FilterTerms(post.Text) { repost(&post) }
If the post has text and a reply, it is filtered with a function called FilterTerms. If it passes the filter, it is reposted.
The decodeCID function is responsible for decoding a content identifier (CID) from a set of bytes. It takes these bytes and tries to transform them into a CID that can be used to identify blocks of data.
func decodeCID(cidBytes []byte) (cid.Cid, error) { var c cid.Cid c, err := cid.Decode(string(cidBytes)) if err != nil { return c, fmt.Errorf("error decoding CID: %w", err) } return c, nil }
With that, we have the Websocket ready.
Let's create the following within utils in filter-terms.go:
var ( terms = []string{"#hashtag2", "#hashtag1"} ) func FilterTerms(text string) bool { for _, term := range terms { if strings.Contains(strings.ToLower(text), strings.ToLower(term)) { return true } } return false }
It is in this function that we define the hashtags to be monitored, in a simple way we receive a text that comes from the websocket and filter it based on the terms.
Let's create a function called createRecord in the create-record.go file, which will be responsible for creating a repost or a like, depending on the $type that is sent via parameter.
First, let's create a struct with the parameters we will need:
type CreateRecordProps struct { DIDResponse *DIDResponse Resource string URI string CID string }
The final function will look like this:
func createRecord(r *CreateRecordProps) error { body := map[string]interface{}{ "$type": r.Resource, "collection": r.Resource, "repo": r.DIDResponse.DID, "record": map[string]interface{}{ "subject": map[string]interface{}{ "uri": r.URI, "cid": r.CID, }, "createdAt": time.Now(), }, } jsonBody, err := json.Marshal(body) if err != nil { slog.Error("Error marshalling request", "error", err, "resource", r.Resource) return err } url := fmt.Sprintf("%s/com.atproto.repo.createRecord", API_URL) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBody)) if err != nil { slog.Error("Error creating request", "error", err, "r.Resource", r.Resource) return nil } req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", r.DIDResponse.AccessJwt)) req.Header.Set("Content-Type", "application/json") client := &http.Client{} resp, err := client.Do(req) if err != nil { slog.Error("Error sending request", "error", err, "r.Resource", r.Resource) return nil } if resp.StatusCode != http.StatusOK { slog.Error("Unexpected status code", "status", resp, "r.Resource", r.Resource) return nil } slog.Info("Published successfully", "resource", r.Resource) return nil }
It's simple to understand, we make a POST to the API_URL/com.atproto.repo.createRecord endpoint, informing that we are going to create a record, in the body we inform the $type, which informs the bluesky API the type of record we are going to create, then we assemble the request, inserting the bearer token and we do some error handling, simple, isn't it?
This way we can use the createRecord function to create several records, changing only the $type.
With createRecord ready, it's simple to create the repost, let's do this in the repost.go file:
func repost(p *Post) error { token, err := getToken() if err != nil { slog.Error("Error getting token", "error", err) return err } resource := &CreateRecordProps{ DIDResponse: token, Resource: "app.bsky.feed.repost", URI: p.Reply.Root.Uri, CID: p.Reply.Root.Cid, } err = createRecord(resource) if err != nil { slog.Error("Error creating record", "error", err, "resource", resource.Resource) return err } resource.Resource = "app.bsky.feed.like" err = createRecord(resource) if err != nil { slog.Error("Error creating record", "error", err, "resource", resource.Resource) return err } return nil }
We receive a pointer to the *Post from the Websocket() function, we set up the CreateRecordProps informing that we are going to make a repost through the app.bsky.feed.repost resource, and finally we call createRecord.
After creating the post, we will give it a like (optional), just call createRecord again, but now with the app.bsky.feed.like resource, since we created the resource in a variable, just set a new value, which is what we do resource.Resource = "app.bsky.feed.like".
With that, we can now make the repost and the like.
This part is optional, it will be used only for deployment, it will be used by the hosting service to check if our bot is still working, it is a very simple endpoint that only returns a status code 200.
Let's do it in the health-check.go file:
func HealthCheck(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }
The HealthCheck function returns only a w.WriteHeader(http.StatusOK), this could be done directly in the main.go file, which is where we will start our web server, but I chose to separate it.
Well, now we just need to get everything running, let's do that in main.go:
func main() { slog.Info("Starting bot") err := godotenv.Load() if err != nil { slog.Error("Error loading .env file") } go func() { http.HandleFunc("/health", bot.HealthCheck) slog.Info("Starting health check server on :8080") if err := http.ListenAndServe(":8080", nil); err != nil { log.Fatal("Failed to start health check server:", err) } }() err = bot.Websocket() if err != nil { log.Fatal(err) } }
Very simple too:
Now, let's run:
go run cdm/main.go
We will have the bot running:
2024/09/13 09:11:31 INFO Starting bot 2024/09/13 09:11:31 INFO Starting health check server on :8080 2024/09/13 09:11:32 INFO Connected to WebSocket url=wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos
We can test it on Bluesky, I used the hashtag #bot-teste for testing purposes, let's create a post and comment on it:
See that the repost was made and now it has the like, and in the terminal we have the logs:
2024/09/13 09:14:16 INFO Published successfully resource=app.bsky.feed.repost 2024/09/13 09:14:16 INFO Published successfully resource=app.bsky.feed.like
We have covered how to create a bot for the Bluesky social network, using Golang and various technologies such as Websockets, AT Protocol, CAR and CBOR.
The bot is responsible for monitoring specific hashtags and, when it finds one of them, it reposts and likes the original post.
This is just one of the features we can do with the bot, the Bluesky API is very complete and allows for several possibilities, you can use this bot and add new features ?.
See the post on my blog here
Subscribe and receive notification of new posts, participate
repository of the project
bot profile on Bluesky
Bluesky documentation
Gopher credits
以上是为 Bluesky Social 创建机器人的详细内容。更多信息请关注PHP中文网其他相关文章!