⛏️

MongoDB

Created
Mar 18, 2024 07:58 AM
Tags

爬虫写入代码

# -*- coding: utf-8 -*- # @Time : 2023/11/23 10:11 # @Author : lzc # @Email : hybpjx@163.com # @File : to_mongodb.py # @Software: PyCharm # @blog : https://www.cnblogs.com/zichliang from pprint import pprint from loguru import logger from pymongo import MongoClient class MongoConnection: def __init__(self, database: str, table_name: str, connect_url="mongodb://localhost:27017/", ): client = MongoClient(connect_url) db = client[database] self.coll = db[table_name] def save(self, item): self.coll.update_one(item, {"$set": item}, upsert=True) logger.info("写入成功") def is_exist(self, key, value): try: exist_name = self.coll.find_one({key: value, }, {"_id": 0})[key] if exist_name: if exist_name == value: logger.warning("""!!! {%s:%s} 已爬,现在正在跳过... !!!!""" % (key, value)) return False except TypeError: logger.success(f"数据库中 暂无数据:{value} ") return True if __name__ == '__main__': mc = MongoConnection(database="设计研究院",table_name="xx网爬取") print(mc.is_exist("项目名称", "凹凸棒石粘土矿30万立方米/年采矿技改扩建项目"))

motor 结合sanic 导出 excel 包含前端

安装

pip install motor

代码

import urllib from io import BytesIO from sanic import Sanic, response from pymongo import MongoClient from openpyxl import Workbook from sanic_jinja2 import SanicJinja2 from motor.motor_asyncio import AsyncIOMotorClient app = Sanic(__name__) jinja = SanicJinja2(app) app.config.JINJA2_TEMPLATE_DIR = './templates' # 连接MongoDB @app.listener('before_server_start') async def setup_db(app, loop): app.ctx.client = AsyncIOMotorClient('mongodb://localhost:27017') # 连接到 MongoDB async def get_mongo_data(collection_name): db = app.ctx.client['xxx'] collection = db.get_collection(collection_name) # 查询数据并返回 async def fetch_data(): cursor = collection.find({}, {'_id': 0}) data = [] async for record in cursor: data.append(dict(record)) return data result = await fetch_data() return result # 定义一个路由来渲染包含按钮的 HTML 页面 @app.route('/') async def index(request): db = app.ctx.client['京东'] collection_names = await db.list_collection_names() return jinja.render('index.html', request, collection_names=sorted(collection_names)) @app.route('/export') async def export_data(request): collection_name = request.args.get('collection_name') # 从查询参数中获取集合名 if not collection_name: return response.json({"error": "Missing required parameter 'collection_name'"}, status=400) mongo_data = await get_mongo_data(collection_name) # 创建一个新的 Excel 工作簿 wb = Workbook() # 选择活动的工作表,默认是第一个 ws = wb.active ws.title = 'Sheet1' # 将数据转换为列表形式,便于写入 Excel rows = [list(record.values()) for record in mongo_data] # 写入表头(假设数据的第一条记录的键即为表头) headers = list(mongo_data[0].keys()) ws.append(headers) # 写入数据 for row in rows: ws.append(row) # 保存 Excel 文件到内存中的二进制流 with BytesIO() as output: wb.save(output) excel_data = output.getvalue() # 设置响应头并发送 XLSX 文件内容 headers = { 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet; charset=utf-8', 'Content-Disposition': f'attachment; filename={urllib.parse.quote(collection_name)}.xlsx' } return response.raw(excel_data, headers=headers) # 在请求结束时关闭连接(可选,根据实际需求) @app.listener('after_server_stop') async def close_db(app, loop): app.ctx.client.close() if __name__ == "__main__": app.run(host="0.0.0.0", port=8888)
前端
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>表格导出</title> <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script> </head> <body> <ul> {% for name in collection_names %} <li> <span>{{ name }}</span> —————————————————————— <button class="export-btn">导出成excel</button> </li> {% endfor %} </ul> <script> $(document).ready(function () { $(".export-btn").click(function (event) { event.preventDefault(); var spanElement = this.previousSibling.previousSibling; var name = spanElement.textContent || spanElement.innerText; $.ajax({ url: `/export?collection_name=${name}`, type: 'GET', xhrFields: { responseType: 'blob' // 设置响应类型为 blob,以便接收二进制文件 }, success: function (data, textStatus, jqXHR) { var disposition = jqXHR.getResponseHeader('Content-Disposition'); var filename = decodeURI(disposition.match(/filename[^;=\n]*=((['"]).*?\2|[^;\n]*)/)[1]); var blob = new Blob([data], {type: jqXHR.getResponseHeader('Content-Type')}); // 创建一个隐藏的可下载链接 var a = document.createElement('a'); a.href = URL.createObjectURL(blob); a.download = filename.replace('"', ''); a.style.display = 'none'; document.body.appendChild(a); // 触发下载 a.click(); console.log(a) console.log() // 清理 window.URL.revokeObjectURL(a.href); document.body.removeChild(a); }, error: function (jqXHR, textStatus, errorThrown) { console.error('Error:', errorThrown); } }); }); }); </script> </body> </html>