Handle paginated API responses in data sources to fetch large result sets efficiently.
🤖 AI-Generated Content
This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
importattrsimporthttpxfrompyvider.data_sourcesimportregister_data_source,BaseDataSourcefrompyvider.schemaimports_data_source,a_str,a_num,a_list,PvsSchema@attrs.defineclassPaginatedQueryConfig:endpoint:strlimit:int=100# Max results to fetch@attrs.defineclassPaginatedQueryData:id:stritems:list[dict]total_fetched:inthas_more:bool@register_data_source("paginated_query")classPaginatedQuery(BaseDataSource):config_class=PaginatedQueryConfigstate_class=PaginatedQueryData@classmethoddefget_schema(cls)->PvsSchema:returns_data_source({"endpoint":a_str(required=True),"limit":a_num(default=100),"id":a_str(computed=True),"items":a_list(a_map(a_str()),computed=True),"total_fetched":a_num(computed=True),"has_more":a_bool(computed=True),})asyncdefread(self,config:PaginatedQueryConfig)->PaginatedQueryData:all_items=[]page=1has_more=Trueasyncwithhttpx.AsyncClient()asclient:whilehas_moreandlen(all_items)<config.limit:response=awaitclient.get(f"https://api.example.com{config.endpoint}",params={"page":page,"per_page":100})data=response.json()all_items.extend(data["items"])has_more=data.get("has_more",False)page+=1# Stop if we have enoughiflen(all_items)>=config.limit:breakreturnPaginatedQueryData(id=f"{config.endpoint}:{config.limit}",items=all_items[:config.limit],total_fetched=len(all_items[:config.limit]),has_more=has_more,)
asyncdefread(self,config:Config)->Data:all_items=[]next_token=Noneasyncwithhttpx.AsyncClient()asclient:whilelen(all_items)<config.limit:params={"limit":min(100,config.limit-len(all_items))}ifnext_token:params["next_token"]=next_tokenresponse=awaitclient.get(f"https://api.example.com{config.endpoint}",params=params)data=response.json()all_items.extend(data["items"])next_token=data.get("next_token")# No more pagesifnotnext_token:breakreturnData(id=f"{config.endpoint}:{config.limit}",items=all_items,total_fetched=len(all_items),)
asyncdefread(self,config:Config)->Data:all_items=[]offset=0page_size=100asyncwithhttpx.AsyncClient()asclient:whilelen(all_items)<config.limit:fetch_size=min(page_size,config.limit-len(all_items))response=awaitclient.get(f"https://api.example.com{config.endpoint}",params={"offset":offset,"limit":fetch_size})data=response.json()items=data.get("items",[])ifnotitems:break# No more resultsall_items.extend(items)offset+=len(items)# Got fewer than requested = last pageiflen(items)<fetch_size:breakreturnData(id=f"{config.endpoint}:{config.limit}",items=all_items,total_fetched=len(all_items),)
importhttpxfromurllib.parseimportparse_qs,urlparseasyncdefread(self,config:Config)->Data:all_items=[]url=f"https://api.example.com{config.endpoint}"asyncwithhttpx.AsyncClient()asclient:whileurlandlen(all_items)<config.limit:response=awaitclient.get(url,params={"per_page":100})data=response.json()all_items.extend(data)# Parse Link header for next pagelink_header=response.headers.get("Link","")url=Noneforlinkinlink_header.split(","):if'rel="next"'inlink:url=link.split(";")[0].strip("<> ")breakiflen(all_items)>=config.limit:breakreturnData(id=f"{config.endpoint}:{config.limit}",items=all_items[:config.limit],total_fetched=len(all_items[:config.limit]),)
importasyncioasyncdefread(self,config:Config)->Data:# Determine total pages neededpages_needed=(config.limit+99)//100# Round upasyncwithhttpx.AsyncClient()asclient:# Create tasks for each pagetasks=[self._fetch_page(client,config.endpoint,page)forpageinrange(1,pages_needed+1)]# Fetch all pages concurrentlyresults=awaitasyncio.gather(*tasks,return_exceptions=True)# Combine resultsall_items=[]forresultinresults:ifisinstance(result,Exception):continue# Skip failed pagesall_items.extend(result)returnData(id=f"{config.endpoint}:{config.limit}",items=all_items[:config.limit],total_fetched=len(all_items[:config.limit]),)asyncdef_fetch_page(self,client:httpx.AsyncClient,endpoint:str,page:int)->list[dict]:"""Fetch a single page."""response=awaitclient.get(f"https://api.example.com{endpoint}",params={"page":page,"per_page":100})data=response.json()returndata.get("items",[])
importasynciofromdatetimeimportdatetime,timedeltaclassRateLimitedQuery(BaseDataSource):def__init__(self):super().__init__()self._last_request=Noneself._min_interval=timedelta(milliseconds=100)# 10 req/secasyncdef_wait_for_rate_limit(self):"""Ensure we don't exceed rate limit."""ifself._last_request:elapsed=datetime.now()-self._last_requestifelapsed<self._min_interval:wait_time=(self._min_interval-elapsed).total_seconds()awaitasyncio.sleep(wait_time)self._last_request=datetime.now()asyncdefread(self,config:Config)->Data:all_items=[]page=1asyncwithhttpx.AsyncClient()asclient:whilelen(all_items)<config.limit:# Respect rate limitawaitself._wait_for_rate_limit()response=awaitclient.get(f"https://api.example.com{config.endpoint}",params={"page":page,"per_page":100})# Handle 429 Too Many Requestsifresponse.status_code==429:retry_after=int(response.headers.get("Retry-After","60"))awaitasyncio.sleep(retry_after)continue# Retry same pagedata=response.json()all_items.extend(data["items"])ifnotdata.get("has_more"):breakpage+=1returnData(id=f"{config.endpoint}:{config.limit}",items=all_items[:config.limit],)
asyncdefread(self,config:Config)->Data:all_items=[]page=1errors=[]asyncwithhttpx.AsyncClient()asclient:whilepage<=10andlen(all_items)<config.limit:# Max 10 pagestry:response=awaitclient.get(f"https://api.example.com{config.endpoint}",params={"page":page,"per_page":100},timeout=30.0# Add timeout)response.raise_for_status()data=response.json()all_items.extend(data.get("items",[]))ifnotdata.get("has_more"):breakpage+=1excepthttpx.HTTPErrorase:errors.append(f"Page {page} failed: {e}")# Continue to next page instead of failing completelypage+=1continueexceptExceptionase:errors.append(f"Unexpected error on page {page}: {e}")break# Stop on unexpected errorsreturnData(id=f"{config.endpoint}:{config.limit}",items=all_items[:config.limit],total_fetched=len(all_items),errors=errorsiferrorselseNone,)
asyncdefread(self,config:Config)->Data:all_items=[]try:# Fetch pages...exceptExceptionase:# Return what we gotreturnData(items=all_items,error=f"Partial results due to: {e}")