search
HomeDatabaseRedisHow to implement concurrent queuing based on redis optimistic locking

There is a demand scenario like this, using redis to control the number of scrapy runs. After setting the system background to 4, scrapy can only start up to 4 tasks, and excess tasks will be queued to wait.

Overview

I recently built a django scrapy celery redis crawler system. In addition to running other programs, the host purchased by the customer also needs to run the set of programs I developed, so scrapy needs to be manually controlled. The number of instances to avoid excessive crawlers burdening the system.

Process Design

1. The crawler task is initiated by the user in the form of a request, and all user requests are uniformly entered into celery for queuing;
2. The execution of task quantity control is handed over For reids, it is saved to redis via celery, which contains the necessary information required for crawler startup. A crawler can be started by taking a piece of information from redis;
3. Get the number of currently running crawlers through the scrapyd interface to make a decision Next step: If it is less than 4, get the corresponding amount of information from redis to start the crawler. If it is greater than or equal to 4, continue to wait;
4. If the number of running crawlers decreases, timely retrieve it from reids Get the corresponding amount of information to start the crawler.

Code implementation

The business code is a bit complicated and long-winded. Pseudocode is used here to demonstrate

import redis

# 实例化一个redis连接池
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, decode_responses=True, db=4, password='')

r = redis.Redis(connection_pool=pool)
# 爬虫实例限制为4 即只允许4个scrapy实例在运行
limited = 4

# 声明redis的乐观锁
lock = r.Lock()

# lock.acquire中有while循环,即它会线程阻塞,直到当前线程获得redis的lock,才会继续往下执行代码
if lock.acquire():
	# 1、从reids中取一条爬虫信息
	info = redis.get() 
	
	# 2、while循环监听爬虫运行的数量
	while True:
		req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()
		# 统计当前有多少个爬虫在运行
		running = req.get('running') + req.get('pending')
		
		# 3、判断是否等待还是要增加爬虫数量
		# 3.1 如果在运行的数量大于等于设置到量 则继续等待
		if running >= limited:
			continue
		
		# 3.2 如果小于 则启动爬虫
		start_scrapy(info)
		# 3.3 将info从redis中删除
		redis.delete(info)
		# 3.4 释放锁
		lock.release()
		break		

Currently, this is just pseudocode. The actual business logic may be very Complex ones, such as:

@shared_task
def scrapy_control(key_uuid):

    r = redis.Redis(connection_pool=pool)
    db = MysqlDB()
    speed_limited = db.fetch_config('REPTILE_SPEED')
    speed_limited = int(speed_limited[0])

    keywords_num = MysqlDB().fetch_config('SEARCH_RANDOM')
    keywords_num = int(keywords_num[0])


    # while True:
    lock = r.lock('lock')
    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 进入处理环节' +  '\n')
    try:
        # acquire默认阻塞 如果获取不到锁时 会一直阻塞在这个函数的while循环中
        if lock.acquire():
            with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 获得锁' +  '\n')
            # 1 从redis中获取信息
            redis_obj = json.loads(r.get(key_uuid))
            user_id = redis_obj.get('user_id')
            contents = redis_obj.get('contents')
            
            # 2 使用while循环处理核心逻辑          
            is_hold_print = True
            while True:
                req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()
                running = req.get('running') + req.get('pending')
                # 3 如果仍然有足够的爬虫在运行 则hold住redis锁,等待有空余的爬虫位置让出
                if running >= speed_limited:
                    if is_hold_print:
                        with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 爬虫在运行,线程等待中' +  '\n')
                        is_hold_print = False
                    time.sleep(1)
                    continue
                
                # 4 有空余的爬虫位置 则往下走
                # 4.1 处理完所有的内容后 释放锁
                if len(contents) == 0:
                    r.delete(key_uuid)
                    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 任务已完成,从redis中删除' +  '\n')
                    lock.release()
                    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 释放锁' +  '\n')
                    break

                # 4.2 创建task任务
                task_uuid = str(uuid.uuid4())
                article_obj = contents.pop()
                article_id = article_obj.get('article_id')
                article = article_obj.get('content')
                try:
                    Task.objects.create(
                        task_uuid = task_uuid,
                        user_id = user_id,
                        article_id = article_id,
                        content = article
                    )
                except Exception as e:
                    with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + '->' + str(task_uuid) + ' 创建Task出错: ' + str(e) +  '\n')
                # finally:
                # 4.3 启动爬虫任务 即便创建task失败也会启动
                try:
                    task_chain(user_id, article, task_uuid, keywords_num)
                except Exception as e:
                    with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 启动任务链失败: ' + str(e) +  '\n')
                
                # 加入sleep 防止代码执行速度快于爬虫启动速度而导致当前线程启动额外的爬虫
                time.sleep(5)

    except Exception as e:
        with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 获得锁之后的操作出错: ' + str(e) +  '\n')
        lock.release()

小空
scrapy startup speed is relatively slow, so in the while loop, the code is executed to start the crawler, and you need to sleep for a while before getting the crawler through the scrapyd interface. The number of runs, if read immediately, may cause misjudgment.

The above is the detailed content of How to implement concurrent queuing based on redis optimistic locking. For more information, please follow other related articles on the PHP Chinese website!

Statement
This article is reproduced at:亿速云. If there is any infringement, please contact admin@php.cn delete
es和redis区别es和redis区别Jul 06, 2019 pm 01:45 PM

Redis是现在最热门的key-value数据库,Redis的最大特点是key-value存储所带来的简单和高性能;相较于MongoDB和Redis,晚一年发布的ES可能知名度要低一些,ES的特点是搜索,ES是围绕搜索设计的。

一起来聊聊Redis有什么优势和特点一起来聊聊Redis有什么优势和特点May 16, 2022 pm 06:04 PM

本篇文章给大家带来了关于redis的相关知识,其中主要介绍了关于redis的一些优势和特点,Redis 是一个开源的使用ANSI C语言编写、遵守 BSD 协议、支持网络、可基于内存、分布式存储数据库,下面一起来看一下,希望对大家有帮助。

实例详解Redis Cluster集群收缩主从节点实例详解Redis Cluster集群收缩主从节点Apr 21, 2022 pm 06:23 PM

本篇文章给大家带来了关于redis的相关知识,其中主要介绍了Redis Cluster集群收缩主从节点的相关问题,包括了Cluster集群收缩概念、将6390主节点从集群中收缩、验证数据迁移过程是否导致数据异常等,希望对大家有帮助。

Redis实现排行榜及相同积分按时间排序功能的实现Redis实现排行榜及相同积分按时间排序功能的实现Aug 22, 2022 pm 05:51 PM

本篇文章给大家带来了关于redis的相关知识,其中主要介绍了Redis实现排行榜及相同积分按时间排序,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,希望对大家有帮助。

详细解析Redis中命令的原子性详细解析Redis中命令的原子性Jun 01, 2022 am 11:58 AM

本篇文章给大家带来了关于redis的相关知识,其中主要介绍了关于原子操作中命令原子性的相关问题,包括了处理并发的方案、编程模型、多IO线程以及单命令的相关内容,下面一起看一下,希望对大家有帮助。

实例详解Redis实现排行榜及相同积分按时间排序功能的实现实例详解Redis实现排行榜及相同积分按时间排序功能的实现Aug 26, 2022 pm 02:09 PM

本篇文章给大家带来了关于redis的相关知识,其中主要介绍了Redis实现排行榜及相同积分按时间排序,本文通过实例代码给大家介绍的非常详细,下面一起来看一下,希望对大家有帮助。

一文搞懂redis的bitmap一文搞懂redis的bitmapApr 27, 2022 pm 07:48 PM

本篇文章给大家带来了关于redis的相关知识,其中主要介绍了bitmap问题,Redis 为我们提供了位图这一数据结构,位图数据结构其实并不是一个全新的玩意,我们可以简单的认为就是个数组,只是里面的内容只能为0或1而已,希望对大家有帮助。

redis error什么意思redis error什么意思Jun 17, 2019 am 11:07 AM

redis error就是redis数据库和其组合使用的部件出现错误,这个出现的错误有很多种,例如Redis被配置为保存数据库快照,但它不能持久化到硬盘,用来修改集合数据的命令不能用。

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
2 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Best Graphic Settings
2 weeks agoBy尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. How to Fix Audio if You Can't Hear Anyone
2 weeks agoBy尊渡假赌尊渡假赌尊渡假赌

Hot Tools

VSCode Windows 64-bit Download

VSCode Windows 64-bit Download

A free and powerful IDE editor launched by Microsoft

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

EditPlus Chinese cracked version

EditPlus Chinese cracked version

Small size, syntax highlighting, does not support code prompt function

MantisBT

MantisBT

Mantis is an easy-to-deploy web-based defect tracking tool designed to aid in product defect tracking. It requires PHP, MySQL and a web server. Check out our demo and hosting services.

mPDF

mPDF

mPDF is a PHP library that can generate PDF files from UTF-8 encoded HTML. The original author, Ian Back, wrote mPDF to output PDF files "on the fly" from his website and handle different languages. It is slower than original scripts like HTML2FPDF and produces larger files when using Unicode fonts, but supports CSS styles etc. and has a lot of enhancements. Supports almost all languages, including RTL (Arabic and Hebrew) and CJK (Chinese, Japanese and Korean). Supports nested block-level elements (such as P, DIV),