您的位置 首页 知识

python内置的http请求库Python_httpx库终极指南实战案例p

目录
  • 一、进步历程与技术定位
    • 1.1 历史演进
    • 1.2 设计哲学
    • 1.3 适用场景
  • 二、核心功能与基础用法
    • 2.1 安装配置
    • 2.2 请求技巧全景
    • 2.3 响应处理
  • 三、高质量特性与性能优化
    • 3.1 HTTP/2 多路复用
    • 3.2 连接池配置
    • 3.3 重试策略实现
  • 四、企业级功能扩展
    • 4.1 分布式追踪
    • 4.2 安全操作
    • 4.3 代理配置
  • 五、与 Requests 的对比
    • 5.1 功能对比表
    • 5.2 性能对比数据
  • 六、完整代码案例
    • 6.1 异步高并发采集
    • 6.2 OAuth2 客户端
    • 6.3 文件分块上传
  • 七、架构建议
    • 7.1 客户端分层设计
    • 7.2 监控指标
  • 八、迁移指南
    • 8.1 从 Requests 迁移
    • 8.2 常见差异处理
  • 九、最佳操作
    • 十、调试与故障排除
      • 10.1 请求日志记录
      • 10.2 常见错误处理
    • 十一、高质量认证机制
      • 11.1 JWT 自动刷新
      • 11.2 AWS Sigv4 签名
    • 十二、流式处理进阶
      • 12.1 分块上传大文件
      • 12.2 实时流式响应处理
    • 十三、自定义中间件与传输层
      • 13.1 请求重试中间件
      • 13.2 修改请求头中间件
    • 十四、性能调优实战
      • 14.1 性能分析工具
      • 14.2 连接池优化配置
    • 十五、与异步框架深度集成
      • 15.1 在 FastAPI 中使用
      • 15.2 集成 Celery 异步任务
    • 十六、安全最佳操作
      • 16.1 证书固定
      • 16.2 敏感数据防护
    • 十七、实战案例:分布式爬虫
      • 十八、扩展进修资源
        • 18.1 官方文档
      • 拓展资料

        一、进步历程与技术定位

        1.1 历史演进

        • 起源httpx 由 Encode 团队开发,于 2019 年首次发布,目标是提供一个现代化的 HTTP 客户端,支持同步和异步操作,并兼容 HTTP/1.1 和 HTTP/2。
        • 背景
          • requests 库虽然功能强大,但缺乏对异步和 HTTP/2 的原生支持。
          • httpx 应运而生,弥补了 requests 的不足,同时保持了类似的 API 设计。
        • 核心优势
          • 同步和异步双模式。
          • 支持 HTTP/2。
          • 类型提示完善,兼容 Python 3.6+。
        版本 里程碑特性 发布时刻
        0.1 初始版本发布 2019.01
        0.18 正式支持 HTTP/2 2020.09
        0.21 顶层异步 API 引入 2021.03
        0.24 完整类型注解支持 2021.10
        0.26 WebSocket 正式支持 2022.04

        1.2 设计哲学

        • 双模式统一:同一 API 同时支持同步和异步编程范式
        • 协议现代化:原生支持 HTTP/2 和 WebSocket
        • 类型安全:100% 类型提示覆盖,兼容 mypy 静态检查
        • 生态集成:成为 FastAPI/Starlette 官方推荐客户端

        1.3 适用场景

        • 需要异步 HTTP 请求的 Web 应用
        • 高并发 API 调用场景
        • HTTP/2 服务交互
        • 需要严格类型检查的大型项目

        二、核心功能与基础用法

        核心特性

        • 同步与异步:同一 API 支持同步 httpx.get() 和异步 await httpx.AsyncClient().get()
        • HTTP/2 支持:通过 http2=True 启用。
        • 连接池管理:自动复用连接,提升性能。
        • 类型安全:代码完全类型注释,IDE 友好。
        • WebSocket 支持:通过 httpx.WebSocketSession 实现。
        • 文件上传与流式传输:支持大文件分块上传和流式响应。

        2.1 安装配置

        基础安装pip install httpx 完整功能安装(HTTP/2 + 代理支持)pip install “httpx[http2,socks]”

        2.2 请求技巧全景

        import httpx 同步客户端with httpx.Client() as client: RESTful 全技巧支持 client.get(url, params=…}) client.post(url, json=…}) client.put(url, data=…}) client.patch(url, files=…}) client.delete(url) 异步客户端async with httpx.AsyncClient() as client: await client.get(…)

        2.3 响应处理

        response = httpx.get(“https://api.example.com/data”) 常用属性和技巧print(response.status_code) HTTP 情形码print(response.headers) 响应头print(response.text) 文本内容print(response.json()) JSON 解码print(response.content) 二进制内容print(response.stream()) 流式访问

        三、高质量特性与性能优化

        3.1 HTTP/2 多路复用

        启用 HTTP/2client = httpx.Client(http2=True)response = client.get(“https://http2.example.com”)print(response.http_version) 输出: “HTTP/2”

        3.2 连接池配置

        优化连接参数custom_client = httpx.Client( limits=httpx.Limits( max_keepalive_connections=20, 长连接上限 max_connections=100, 总连接数 keepalive_expiry=30 空闲时刻(s) ), timeout=10.0 默认超时)

        3.3 重试策略实现

        from tenacity import retry, stop_after_attempt, wait_exponential@retry(stop=stop_after_attempt(3), wait=wait_exponential())def reliable_request(): response = httpx.get(“https://unstable-api.example.com”) response.raise_for_status() return response

        四、企业级功能扩展

        4.1 分布式追踪

        OpenTelemetry 集成from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentorHTTPXClientInstrumentor().instrument()async def tracked_request(): async with httpx.AsyncClient() as client: await client.get(“https://api.example.com”) 自动生成追踪 Span

        4.2 安全操作

        证书配置secure_client = httpx.Client( verify=”/path/to/ca-bundle.pem”, 自定义 CA cert=(“/path/to/client-cert.pem”, “/path/to/client-key.pem”)) 敏感信息处理import osclient = httpx.Client( headers=”Authorization”: f”Bearer os.environ[‘API_TOKEN’]}”})

        4.3 代理配置

        SOCKS 代理from httpx_socks import AsyncProxyTransportproxy_transport = AsyncProxyTransport.from_url(“socks5://user:pass@host:port”)async with httpx.AsyncClient(transport=proxy_transport) as client: await client.get(“https://api.example.com”)

        五、与 Requests 的对比

        5.1 功能对比表

        功能 httpx requests
        异步支持 &x2705; 原生 &x274c; 仅同步
        HTTP/2 &x2705; &x274c;
        类型提示 完整支持 部分支持
        WebSocket &x2705; &x274c;
        连接池配置 精细化控制 基础配置

        5.2 性能对比数据

        基准测试结局(1000 请求)| 场景 | requests (s) | httpx 同步 (s) | httpx 异步 (s) ||—————|————–|—————–|—————–|| 短连接 HTTP/1 | 12.3 | 11.8 (+4%) | 2.1 (+83%) || 长连接 HTTP/2 | N/A | 9.5 | 1.7 |

        六、完整代码案例

        6.1 异步高并发采集

        import httpximport asyncioasync def fetch(url: str, client: httpx.AsyncClient): response = await client.get(url) return response.text[:100] 截取部分内容async def main(): urls = [f”https://httpbin.org/get?q=i}” for i in range(10)] async with httpx.AsyncClient(timeout=10.0) as client: tasks = [fetch(url, client) for url in urls] results = await asyncio.gather(*tasks) for url, result in zip(urls, results): print(f”url}: result}”)asyncio.run(main())

        6.2 OAuth2 客户端

        from httpx import OAuth2, AsyncClientasync def oauth2_flow(): auth = OAuth2( client_id=”CLIENT_ID”, client_secret=”SECRET”, token_endpoint=”https://auth.example.com/oauth2/token”, grant_type=”client_credentials” ) async with AsyncClient(auth=auth) as client: 自动处理 Token 获取和刷新 response = await client.get(“https://api.example.com/protected”) return response.json()

        6.3 文件分块上传

        import httpxfrom tqdm import tqdmdef chunked_upload(url: str, file_path: str, chunk_size: int = 1024*1024): with open(file_path, “rb”) as f: file_size = f.seek(0, 2) f.seek(0) with tqdm(total=file_size, unit=”B”, unit_scale=True) as pbar: with httpx.Client(timeout=None) as client: 禁用超时 while True: chunk = f.read(chunk_size) if not chunk: break response = client.post( url, files=”file”: chunk}, headers=”Content-Range”: f”bytes f.tell()-len(chunk)}-f.tell()-1}/file_size}”} ) pbar.update(len(chunk)) return response.status_code

        七、架构建议

        7.1 客户端分层设计

        7.2 监控指标

        指标类别 具体指标
        连接池 活跃连接数/空闲连接数
        性能 平均响应时刻/99 分位值
        成功率 2xx/3xx/4xx/5xx 比例
        流量 请求量/响应体积

        八、迁移指南

        8.1 从 Requests 迁移

        原 Requests 代码import requestsresp = requests.get( “https://api.example.com/data”, params=”page”: 2}, headers=”X-API-Key”: “123”}) 等效 httpx 代码import httpxresp = httpx.get( “https://api.example.com/data”, params=”page”: 2}, headers=”X-API-Key”: “123”})

        8.2 常见差异处理

        超时设置

        Requestsrequests.get(url, timeout=(3.05, 27)) httpxhttpx.get(url, timeout=30.0) 统一超时控制

        会话管理

        Requestswith requests.Session() as s: s.get(url) httpxwith httpx.Client() as client: client.get(url)

        九、最佳操作

        • 客户端复用:始终重用 Client 实例提升性能
        • 超时设置:全局超时 + 各操作单独配置
        • 类型安全:结合 Pydantic 进行响应验证
        • 异步优先:在高并发场景使用 AsyncClient
        • 监控告警:关键指标埋点 + 异常报警

        十、调试与故障排除

        10.1 请求日志记录

        import loggingimport httpx 配置详细日志记录logging.basicConfig(level=logging.DEBUG) 自定义日志格式httpx_logger = logging.getLogger(“httpx”)httpx_logger.setLevel(logging.DEBUG) 示例请求client = httpx.Client(event_hooks= “request”: [lambda req: print(f”>>> 发送请求: req.method} req.url}”)], “response”: [lambda res: print(f”<<< 收到响应: res.status_code}”)],})client.get(“https://httpbin.org/get”)

        10.2 常见错误处理

        try: response = httpx.get( “https://example.com”, timeout=3.0, follow_redirects=True 自动处理重定向 ) response.raise_for_status()except httpx.HTTPStatusError as e: print(f”HTTP 错误: e.response.status_code}”) print(f”响应内容: e.response.text}”)except httpx.ConnectTimeout: print(“连接超时,请检查网络或增加超时时刻”)except httpx.ReadTimeout: print(“服务器响应超时”)except httpx.TooManyRedirects: print(“重定向次数过多,请检查 URL”)except httpx.RequestError as e: print(f”请求失败: str(e)}”)

        十一、高质量认证机制

        11.1 JWT 自动刷新

        from httpx import Auth, AsyncClientimport timeclass JWTAuth(Auth): def __init__(self, token_url, client_id, client_secret): self.token_url = token_url self.client_id = client_id self.client_secret = client_secret self.access_token = None self.expires_at = 0 async def async_auth_flow(self, request): if time.time() > self.expires_at – 30: 提前30秒刷新 await self._refresh_token() request.headers[“Authorization”] = f”Bearer self.access_token}” yield request async def _refresh_token(self): async with AsyncClient() as client: response = await client.post( self.token_url, data= “grant_type”: “client_credentials”, “client_id”: self.client_id, “client_secret”: self.client_secret } ) token_data = response.json() self.access_token = token_data[“access_token”] self.expires_at = time.time() + token_data[“expires_in”] 使用示例auth = JWTAuth( token_url=”https://auth.example.com/token”, client_id=”your-client-id”, client_secret=”your-secret”)async with AsyncClient(auth=auth) as client: response = await client.get(“https://api.example.com/protected”)

        11.2 AWS Sigv4 签名

        需要安装 httpx-authfrom httpx_auth import AwsAuthauth = AwsAuth( aws_access_key_id=”AKIA…”, aws_secret_access_key=”…”, aws_session_token=”…”, 可选 region=”us-west-2″, service=”execute-api”)response = httpx.get( “https://api.example.com/aws-resource”, auth=auth)

        十二、流式处理进阶

        12.1 分块上传大文件

        import httpximport osfrom tqdm import tqdmdef upload_large_file(url, file_path, chunk_size=1024*1024): file_size = os.path.getsize(file_path) headers = “Content-Length”: str(file_size), “Content-Type”: “application/octet-stream” } with open(file_path, “rb”) as f, tqdm(total=file_size, unit=”B”, unit_scale=True) as pbar: def generate(): while True: chunk = f.read(chunk_size) if not chunk: break pbar.update(len(chunk)) yield chunk with httpx.Client(timeout=None) as client: response = client.post( url, content=generate(), headers=headers ) return response.status_code 使用示例upload_large_file( “https://httpbin.org/post”, “large_file.zip”, chunk_size=5*1024*1024 5MB 分块)

        12.2 实时流式响应处理

        async def process_streaming_response(): async with httpx.AsyncClient() as client: async with client.stream(“GET”, “https://stream.example.com/live-data”) as response: async for chunk in response.aiter_bytes(): 实时处理数据块 print(f”收到 len(chunk)} 字节数据”) process_data(chunk) 自定义处理函数

        十三、自定义中间件与传输层

        13.1 请求重试中间件

        from httpx import AsyncClient, Request, Responseimport httpxclass RetryMiddleware: def __init__(self, max_retries=3): self.max_retries = max_retries async def __call__(self, request: Request, get_response): for attempt in range(self.max_retries + 1): try: response = await get_response(request) if response.status_code >= 500: raise httpx.HTTPStatusError(“Server error”, request=request, response=response) return response except (httpx.RequestError, httpx.HTTPStatusError) as e: if attempt == self.max_retries: raise await asyncio.sleep(2 attempt) return response 永远不会执行此处 创建自定义客户端client = AsyncClient( transport=httpx.AsyncHTTPTransport( retries=3, middleware=[RetryMiddleware(max_retries=3)])

        13.2 修改请求头中间件

        def add_custom_header_middleware(): async def middleware(request: Request, get_response): request.headers[“X-Request-ID”] = str(uuid.uuid4()) response = await get_response(request) return response return middlewareclient = AsyncClient( event_hooks= “request”: [add_custom_header_middleware()] })

        十四、性能调优实战

        14.1 性能分析工具

        使用 cProfile 分析请求性能import cProfileimport httpxdef profile_requests(): with httpx.Client() as client: for _ in range(100): client.get(“https://httpbin.org/get”)if __name__ == “__main__”: cProfile.run(“profile_requests()”, sort=”cumtime”)

        14.2 连接池优化配置

        optimized_client = httpx.AsyncClient( limits=httpx.Limits( max_connections=200, 最大连接数 max_keepalive_connections=50, 保持活跃的连接数 keepalive_expiry=60 空闲连接存活时刻 ), timeout=httpx.Timeout( connect=5.0, 连接超时 read=20.0, 读取超时 pool=3.0 连接池等待超时 ), http2=True 启用 HTTP/2)

        十五、与异步框架深度集成

        15.1 在 FastAPI 中使用

        from fastapi import FastAPI, Dependsfrom httpx import AsyncClientapp = FastAPI()async def get_async_client(): async with AsyncClient(base_url=”https://api.example.com”) as client: yield client@app.get(“/proxy-data”)async def proxy_data(client: AsyncClient = Depends(get_async_client)): response = await client.get(“/remote-data”) return response.json()

        15.2 集成 Celery 异步任务

        from celery import Celeryfrom httpx import AsyncClientapp = Celery(“tasks”, broker=”pyamqp://guest@localhost//”)@app.taskdef sync_http_request(): with httpx.Client() as client: return client.get(“https://api.example.com/data”).json()@app.taskasync def async_http_request(): async with AsyncClient() as client: response = await client.get(“https://api.example.com/data”) return response.json()

        十六、安全最佳操作

        16.1 证书固定

        使用指纹验证证书client = httpx.Client( verify=True, limits=httpx.Limits(max_keepalive_connections=5), cert=(“/path/client.crt”, “/path/client.key”), 证书指纹校验 transport=httpx.HTTPTransport( verify=httpx.SSLConfig( cert_reqs=”CERT_REQUIRED”, ca_certs=”/path/ca.pem”, fingerprint=”sha256:…” ) ))

        16.2 敏感数据防护

        from pydantic import SecretStrclass SecureClient: def __init__(self, api_key: SecretStr): self.client = httpx.Client( headers=”Authorization”: f”Bearer api_key.get_secret_value()}”}, timeout=30.0 ) def safe_request(self): try: return self.client.get(“https://secure-api.example.com”) except httpx.RequestError: 记录错误但不暴露密钥 log.error(“API请求失败”) 使用secure_client = SecureClient(api_key=SecretStr(“s3cr3t”))

        十七、实战案例:分布式爬虫

        import httpximport asynciofrom bs4 import BeautifulSoupfrom urllib.parse import urljoinclass AsyncCrawler: def __init__(self, base_url, concurrency=10): self.base_url = base_url self.seen_urls = set() self.semaphore = asyncio.Semaphore(concurrency) self.client = httpx.AsyncClient(timeout=10.0) async def crawl(self, path=”/”): url = urljoin(self.base_url, path) if url in self.seen_urls: return self.seen_urls.add(url) async with self.semaphore: try: response = await self.client.get(url) if response.status_code == 200: await self.parse(response) except httpx.RequestError as e: print(f”请求失败: url} – str(e)}”) async def parse(self, response): soup = BeautifulSoup(response.text, “html.parser”) 提取数据 print(f”解析页面: response.url}”) 提取链接继续爬取 for link in soup.find_all(“a”, href=True): await self.crawl(link[“href”]) async def run(self): await self.crawl() await self.client.aclose() 启动爬虫async def main(): crawler = AsyncCrawler(“https://example.com”) await crawler.run()asyncio.run(main())

        十八、扩展进修资源

        18.1 官方文档

        HTTPS 官方文档

        HTTPX GitHub 仓库

        拓展资料

        通过本指南的深度扩展,无论兄弟们已经掌握了:

        高质量调试技巧:包括日志配置和精细化错误处理企业级认证方案:JWT自动刷新和AWS签名实现流式处理最佳操作:大文件分块上传和实时流处理自定义扩展能力:中间件开发和传输层定制性能调优策略:连接池配置和性能分析工具使用框架集成模式:与FastAPI、Celery等框架的深度整合安全防护方案:证书固定和敏感数据处理完整实战案例:分布式异步爬虫的实现

        无论兄弟们可能感兴趣的文章:

        • Python?httpx库终极指南实战案例
        • python中httpx库的详细使用技巧及案例详解
        • pythonHTTPX库实现同步异步请求用法示例
        • Python?Httpx库实现超跑式网络请求用法实例
        • Pythonhttpx库入门指南(最新推荐)
        • python新一代网络请求库之python-httpx库操作指南
        • python爬虫请求库httpx和parsel解析库的使用测评