没有所谓的捷径
一切都是时间最平凡的累积

python采集wordpress文章并发布到自己wordpress

本文最后更新于2019年8月16日,已超过94天没有更新,如果文章内容失效,请反馈给我们,谢谢!

python多线程类,线程池,wordpress模块实现自动采集文章并发到到wordpress

caiji.py主文件:

线程池负责采集分页中的文章地址并加入队列,线程类负责从队列取文章地址,采集文章并发布wodpress库

#!/usr/local/bin/python3
# coding:utf-8
import requests
import threading
from queue import Queue
from lxml import html
from time import sleep
from myxmlrpc import *
from concurrent.futures import ThreadPoolExecutor, as_completed

sucess_url = set()  # 成功采集
fail_url = set()  # 失败采集
lock = False  # 文章url采集是否完成


class Downder(threading.Thread):
    headers = {
        'User-Agent': 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0',
    }

    def __init__(self, down_queue, times, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.down_queue = down_queue
        self.times = times

    def run(self):
        global lock
        while True:
            if lock and self.down_queue.empty():
                # 采集完成后退出线程
                break
            url = self.down_queue.get()
            if url:
                self.content_download(url)
                sleep(self.times)  # 1分钟50次
            else:
                sleep(1)

    def content_download(self, url):
        print('开始采集:', url)
        try:
            response = requests.get(url, headers=self.headers)
            # response.encoding = 'utf-8'
            result = html.fromstring(response.text)
            title = result.xpath('//h1[@class="article-title"]/a/text()')[0].encode('utf-8')
            test_category = result.xpath('//div[@class="article-meta"]/span/text()')
            categorys = []
            for index, text in enumerate(test_category):
                if '分类' in text:
                    res = f'//div[@class="article-meta"]/span[{index + 1}]/a/text()'
                    category_result = result.xpath(res)
                    for category in category_result:
                        categorys.append(category.encode('utf-8'))
            content = result.xpath('//article[@class="article-content"]')[0]
            content = html.tostring(content, encoding="utf-8")
            # print(title.decode(), content.decode(), categorys.decode(), url)
            wp_post(title, content, categorys, url)  # 发布到wordpress
            sucess_url.add(url)
        except Exception as e:
            print(url, str(e))
            fail_url.add(url)


def crawlPageDate(url, down_queue, times):
    headers = {
        'User-Agent': 'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Trident/5.0',
    }
    try:
        response = requests.get(url, headers=headers)
        response.encoding = 'utf-8'
        result = html.fromstring(response.text)
        res = '//article[contains(@class, "excerpt")]/a/@href'
        article_urls = result.xpath(res)
        if article_urls:
            for url in article_urls:
                print('加入采集队列:', url)
                down_queue.put(url)
        else:
            raise Exception(url + ':URL Access failure')
        sleep(times)
    except Exception as e:
        print(str(e))
        fail_url.add(url)


def main():
    global lock
    page_max_workers = 5  # 采集page列表页面线程数
    article_max_workers = 5  # 采集文章页线程数
    times = 10  # 休息时间
    down_queue = Queue()  # 需要采集的文章url队列
    all_task = []

    # 线程池采集所有page列表页,将文章url加入采集队列
    pool = ThreadPoolExecutor(page_max_workers)
    for page in range(2, 100):
        url = f"https://www.xxx.com/page/{page}"  # wordpress翻页地址
        all_task.append(pool.submit(crawlPageDate, url, down_queue, times))

    # 多线程从采集队列取url采集内容,并写wordpress
    task_down = [Downder(down_queue, times) for _ in range(article_max_workers)]
    for x in task_down:
        x.start()

    # 判断page列表页是否采集完,做标记
    for future in as_completed(all_task):
        future.result()
        lock = True  # 所有分页文章url采集完成标记

    # 子线程阻塞,所有子线程任务完成后回到主线程
    for x in task_down:
        x.join()

    # 主线程统计最后采集成功,失败条数,列出失败url
    print('All tasks done!')
    print('采集成功: %s 条' % len(sucess_url))
    print('采集失败: %s 条' % len(fail_url))
    print(fail_url)


if __name__ == '__main__':
    main()

myxmlrpc.py 发布到wordpress

#!/usr/local/bin/python3
# coding:utf-8
# module python-wordpress-xmlrpc

import os
import requests
from lxml import html
from urllib.parse import urlparse
from wordpress_xmlrpc import Client, WordPressPost
from wordpress_xmlrpc.methods.posts import GetPosts, NewPost
from wordpress_xmlrpc.methods.users import GetUserInfo
from wordpress_xmlrpc.methods import posts
from wordpress_xmlrpc.methods import taxonomies
from wordpress_xmlrpc import WordPressTerm
from wordpress_xmlrpc.compat import xmlrpc_client
from wordpress_xmlrpc.methods import media, posts


def wp_post(title, content, category, url, post_tag=[], post_status="publish"):
    # 发布文章
    urlset = urlparse(url)
    domain = urlset.scheme + '://' + urlset.netloc
    try:
        wp = Client('http://www.自己的域名.top/xmlrpc.php', '管理员帐号', '密码')
        post = WordPressPost()
        post.title = title
        content = content.decode('utf-8')
        imgurls = parsecontent(content, url)  # 获取文章内容中的图片url
        newimgurls = {}  # 图片上传成功后url地址{'imgname': imgurl}
        attachment_id = ''  # 图片上传成功后获取的第一张图id
        if imgurls:
            for imgurl in imgurls:
                try:
                    if 'http' not in imgurl:
                        imgurl = domain + '/' + imgurl  # /upload/img.jpg拼接成完整地址http://xxx.jpg
                    mimetype, img = down_img(imgurl)  # 返回图片类型,图片二进制内容
                    if '?' in imgurl:
                        imgurl = imgurl[:imgurl.index('?')]  # 去掉图片?后面内容
                    imgname = os.path.basename(imgurl)  # 获得url中的文件名
                    data = {'name': imgname,
                            'type': mimetype,  # mimetype
                            }
                    data['bits'] = xmlrpc_client.Binary(img)
                    response = wp.call(media.UploadFile(data))
                    if attachment_id == '':
                        attachment_id = response.get('attachment_id')
                    newimgurls[imgname] = response.get('link')
                except Exception:
                    pass
        # 新图片地址替换旧图片地址
        if imgurls and newimgurls:
            for imgurl in imgurls:
                for imgname, newimgurl in newimgurls.items():
                    imgname = imgname.split('.')[0]
                    if imgname in imgurl and imgname in newimgurl:
                        content = content.replace(imgurl, newimgurl)  # 图片地址替换
                    else:
                        continue
        # 内容中关键词替换
        content = replace_content(domain, content)
        post.content = content.encode('utf-8')
        post.post_status = post_status  # 文章状态,不写默认是草稿,private表示私密的,draft表示草稿,publish表示发布
        post.terms_names = {
            'post_tag': post_tag,  # 文章所属标签,没有则自动创建
            'category': category  # 文章所属分类,没有则自动创建
        }
        if attachment_id:
            post.thumbnail = attachment_id
        post.id = wp.call(posts.NewPost(post))
        return post.id
    except Exception as e:
        raise e


def down_img(url):
    # 返回图片类型,二进制内容
    try:
        response = requests.get(url)
    except Exception as e:
        raise e
    else:
        img = response.content
        imgtype = response.headers.get("Content-Type")
        if 'image' not in imgtype:  # 判断是否是图片
            raise Exception('not imgtype')
        return imgtype, img


def parsecontent(content, url):
    # 返回文章内容中的imgurl列表
    content = html.fromstring(content)
    result = content.xpath('//img/@src')
    return result


def replace_content(domain, content):
    content = content.replace(domain, 'http://www.自己的域名.top')
    content = content.replace('对方邮箱@vip.com', '自己的邮箱@qq.com')
    content = content.replace('对方qq', '自己qq')
    return content

 

赞(0) 打赏
声明:本站发布的内容(图片、视频和文字)以原创、转载和分享网络内容为主,若涉及侵权请及时告知,将会在第一时间删除,联系邮箱:lwarm@qq.com。文章观点不代表本站立场。本站原创内容未经允许不得转载,或转载时需注明出处:红岩子 » python采集wordpress文章并发布到自己wordpress
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

今天所做的努力都是在为明天积蓄力量

联系我们赞助我们