⛏️

MongoDB

爬虫写入代码

import json import pandas as pd from loguru import logger from pymongo import MongoClient class MongoConnection: def __init__(self, database: str, table_name: str, connect_url="mongodb://localhost:27017/", ): client = MongoClient(connect_url) self.database = database self.table_name = table_name db = client[self.database] self.coll = db[self.table_name] def save(self, item): self.coll.update_one(item, {"$set": item}, upsert=True) logger.info("写入成功") def is_exist(self, key, value): try: exist_name = self.coll.find_one({key: value, }, {"_id": 0})[key] if exist_name: if exist_name == value: logger.warning("""!!! {%s:%s} 已爬,现在正在跳过... !!!!""" % (key, value)) return False except TypeError: logger.success(f"数据库中 暂无数据:{value} ") return True def to_export_json(self): json_data = json.dumps(list(self.coll.find({}, {"_id": 0})), default=str, ensure_ascii=False) # 将json数据写入文件 with open(f'{self.database}_{self.table_name}.json', 'w', encoding="utf-8") as file: file.write(json_data) def find_count(self): return self.coll.count_documents({}) def find_data(self): return self.coll.find({}, {"_id": 0}) def to_export_xlsx(self): # 将结果转换为DataFrame df = pd.DataFrame(list(self.coll.find({}, {"_id": 0}))) # 保存为Excel df.to_excel(f'{self.database}_{self.table_name}.xlsx', index=False) # 设置index=False来避免保存DataFrame的索引 print("数据已保存为Excel文件。")

motor 结合sanic 导出 excel 包含前端

安装

pip install motor

代码

import urllib from io import BytesIO from sanic import Sanic, response from pymongo import MongoClient from openpyxl import Workbook from sanic_jinja2 import SanicJinja2 from motor.motor_asyncio import AsyncIOMotorClient app = Sanic(__name__) jinja = SanicJinja2(app) app.config.JINJA2_TEMPLATE_DIR = './templates' # 连接MongoDB @app.listener('before_server_start') async def setup_db(app, loop): app.ctx.client = AsyncIOMotorClient('mongodb://localhost:27017') # 连接到 MongoDB async def get_mongo_data(collection_name): db = app.ctx.client['xxx'] collection = db.get_collection(collection_name) # 查询数据并返回 async def fetch_data(): cursor = collection.find({}, {'_id': 0}) data = [] async for record in cursor: data.append(dict(record)) return data result = await fetch_data() return result # 定义一个路由来渲染包含按钮的 HTML 页面 @app.route('/') async def index(request): db = app.ctx.client['京东'] collection_names = await db.list_collection_names() return jinja.render('index.html', request, collection_names=sorted(collection_names)) @app.route('/export') async def export_data(request): collection_name = request.args.get('collection_name') # 从查询参数中获取集合名 if not collection_name: return response.json({"error": "Missing required parameter 'collection_name'"}, status=400) mongo_data = await get_mongo_data(collection_name) # 创建一个新的 Excel 工作簿 wb = Workbook() # 选择活动的工作表,默认是第一个 ws = wb.active ws.title = 'Sheet1' # 将数据转换为列表形式,便于写入 Excel rows = [list(record.values()) for record in mongo_data] # 写入表头(假设数据的第一条记录的键即为表头) headers = list(mongo_data[0].keys()) ws.append(headers) # 写入数据 for row in rows: ws.append(row) # 保存 Excel 文件到内存中的二进制流 with BytesIO() as output: wb.save(output) excel_data = output.getvalue() # 设置响应头并发送 XLSX 文件内容 headers = { 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet; charset=utf-8', 'Content-Disposition': f'attachment; filename={urllib.parse.quote(collection_name)}.xlsx' } return response.raw(excel_data, headers=headers) # 在请求结束时关闭连接(可选,根据实际需求) @app.listener('after_server_stop') async def close_db(app, loop): app.ctx.client.close() if __name__ == "__main__": app.run(host="0.0.0.0", port=8888)
前端
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>表格导出</title> <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script> </head> <body> <ul> {% for name in collection_names %} <li> <span>{{ name }}</span> —————————————————————— <button class="export-btn">导出成excel</button> </li> {% endfor %} </ul> <script> $(document).ready(function () { $(".export-btn").click(function (event) { event.preventDefault(); var spanElement = this.previousSibling.previousSibling; var name = spanElement.textContent || spanElement.innerText; $.ajax({ url: `/export?collection_name=${name}`, type: 'GET', xhrFields: { responseType: 'blob' // 设置响应类型为 blob,以便接收二进制文件 }, success: function (data, textStatus, jqXHR) { var disposition = jqXHR.getResponseHeader('Content-Disposition'); var filename = decodeURI(disposition.match(/filename[^;=\n]*=((['"]).*?\2|[^;\n]*)/)[1]); var blob = new Blob([data], {type: jqXHR.getResponseHeader('Content-Type')}); // 创建一个隐藏的可下载链接 var a = document.createElement('a'); a.href = URL.createObjectURL(blob); a.download = filename.replace('"', ''); a.style.display = 'none'; document.body.appendChild(a); // 触发下载 a.click(); console.log(a) console.log() // 清理 window.URL.revokeObjectURL(a.href); document.body.removeChild(a); }, error: function (jqXHR, textStatus, errorThrown) { console.error('Error:', errorThrown); } }); }); }); </script> </body> </html>