Building fast, efficient Terraform providers is essential for a good user experience. This guide covers performance optimization techniques for Pyvider providers.
🤖 AI-Generated Content
This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
importrequests# Synchronous libraryasyncdefread(self,ctx:ResourceContext):# This BLOCKS the entire event loop!response=requests.get(f"{self.api_url}/resource/{ctx.state.id}")returnState(...)
importhttpx# Async libraryasyncdefread(self,ctx:ResourceContext):# Non-blocking, allows other operations to proceedresponse=awaitself.http_client.get(f"{self.api_url}/resource/{ctx.state.id}")returnState(...)
asyncdef_create_apply(self,ctx:ResourceContext):# These run one after another (slow)network=awaitself.create_network(ctx.config.network)storage=awaitself.create_storage(ctx.config.storage)compute=awaitself.create_compute(ctx.config.compute)returnState(...),None
importasyncioasyncdef_create_apply(self,ctx:ResourceContext):# These run concurrently (fast)network,storage,compute=awaitasyncio.gather(self.create_network(ctx.config.network),self.create_storage(ctx.config.storage),self.create_compute(ctx.config.compute),)returnState(...),None
asyncdefread_multiple_resources(self,resource_ids:list[str]):"""Read multiple resources in parallel."""# Create tasks for all readstasks=[self.api.get_resource(resource_id)forresource_idinresource_ids]# Execute in parallelresults=awaitasyncio.gather(*tasks,return_exceptions=True)# Handle resultsresources=[]forresultinresults:ifisinstance(result,Exception):logger.warning("Resource read failed",error=str(result))continueresources.append(result)returnresources
importasynciofromconcurrent.futuresimportThreadPoolExecutor# If you MUST use a blocking libraryexecutor=ThreadPoolExecutor(max_workers=4)asyncdefexpensive_blocking_operation(self,data):"""Run blocking operation in thread pool."""loop=asyncio.get_event_loop()# Run in separate thread to avoid blocking event loopresult=awaitloop.run_in_executor(executor,self._blocking_function,data)returnresultdef_blocking_function(self,data):"""Actual blocking operation."""# Blocking code herereturnprocessed_data
fromfunctoolsimportlru_cachefromdatetimeimportdatetime,timedeltaclassMyProvider(BaseProvider):def__init__(self):super().__init__(...)self._cache={}self._cache_ttl={}asyncdefget_with_cache(self,key:str,fetcher:callable,ttl:int=300):"""Get data with time-based caching."""now=datetime.now()# Check cacheifkeyinself._cache:cached_at=self._cache_ttl.get(key)ifcached_atand(now-cached_at).total_seconds()<ttl:logger.debug("Cache hit",key=key)returnself._cache[key]# Fetch fresh datalogger.debug("Cache miss",key=key)data=awaitfetcher()# Update cacheself._cache[key]=dataself._cache_ttl[key]=nowreturndata# Usageasyncdefread(self,ctx:ResourceContext):# Cache region data for 5 minutesregion_info=awaitself.get_with_cache(key=f"region:{self.config.region}",fetcher=lambda:self.api.get_region_info(self.config.region),ttl=300)returnState(...)
@register_provider("mycloud")classMyCloudProvider(BaseProvider):def__init__(self):super().__init__(...)self._api_metadata_cache=Noneself._cache_expiry=Noneasyncdefget_api_metadata(self):"""Get API metadata with caching."""now=datetime.now()# Return cached if still validif(self._api_metadata_cacheandself._cache_expiryandnow<self._cache_expiry):returnself._api_metadata_cache# Fetch fresh metadatametadata=awaitself.api.get_metadata()# Cache for 1 hourself._api_metadata_cache=metadataself._cache_expiry=now+timedelta(hours=1)returnmetadata
classCacheManager:def__init__(self):self._cache={}self._ttl={}asyncdefget(self,key:str,fetcher:callable,ttl:int=300):"""Get from cache or fetch."""now=datetime.now()ifkeyinself._cache:if(now-self._ttl[key]).total_seconds()<ttl:returnself._cache[key]data=awaitfetcher()self._cache[key]=dataself._ttl[key]=nowreturndatadefinvalidate(self,key:str):"""Invalidate specific cache entry."""self._cache.pop(key,None)self._ttl.pop(key,None)definvalidate_pattern(self,pattern:str):"""Invalidate all keys matching pattern."""keys_to_remove=[keyforkeyinself._cache.keys()ifpatterninkey]forkeyinkeys_to_remove:self.invalidate(key)defclear(self):"""Clear entire cache."""self._cache.clear()self._ttl.clear()# Usageasyncdef_update_apply(self,ctx:ResourceContext):result=awaitself.api.update_resource(ctx.state.id,ctx.config)# Invalidate cache for this resourceself.cache_manager.invalidate(f"resource:{ctx.state.id}")returnState(...),None
asyncdefread_all_servers(self,server_ids:list[str]):servers=[]forserver_idinserver_ids:# Makes N API calls!server=awaitself.api.get_server(server_id)servers.append(server)returnservers
fromcollectionsimportdefaultdictimportasyncioclassBatchingProvider(BaseProvider):def__init__(self):super().__init__(...)self._pending_reads=defaultdict(list)self._batch_lock=asyncio.Lock()self._batch_delay=0.1# 100ms batching windowasyncdefget_resource_batched(self,resource_id:str):"""Get resource with automatic batching."""# Create future for this requestfuture=asyncio.Future()asyncwithself._batch_lock:self._pending_reads[resource_id].append(future)# Schedule batch executioniflen(self._pending_reads)==1:asyncio.create_task(self._execute_batch())# Wait for resultreturnawaitfutureasyncdef_execute_batch(self):"""Execute batched reads after delay."""# Wait for batch windowawaitasyncio.sleep(self._batch_delay)asyncwithself._batch_lock:ifnotself._pending_reads:return# Collect all IDsresource_ids=list(self._pending_reads.keys())futures_by_id=dict(self._pending_reads)# Clear pendingself._pending_reads.clear()# Execute batch readtry:results=awaitself.api.batch_get_resources(resource_ids)# Resolve futuresforresource_id,resultinresults.items():forfutureinfutures_by_id[resource_id]:future.set_result(result)exceptExceptionase:# Reject all futuresforfuturesinfutures_by_id.values():forfutureinfutures:future.set_exception(e)
# For high-traffic providerslimits=httpx.Limits(max_connections=200,max_keepalive_connections=50,)# For low-traffic providerslimits=httpx.Limits(max_connections=20,max_keepalive_connections=5,)
fromfunctoolsimportcached_property@register_resource("server")classServer(BaseResource):@classmethod@cached_propertydef_schema(cls)->PvsSchema:"""Cache schema - only computed once."""returns_resource({"name":a_str(required=True),"size":a_str(default="medium"),# ... large schema definition})@classmethoddefget_schema(cls)->PvsSchema:returncls._schema
# Deeply nested with many validations@classmethoddefget_schema(cls):returns_resource({"config":a_obj({"network":a_obj({"vpc":a_obj({"subnets":a_list(a_obj({"cidr":a_str(validators=[...]),"az":a_str(validators=[...]),# etc...}))})})})})
# Bad - Storing unnecessary data@attrs.defineclassServerState:id:strfull_config:dict# Don't store entire configapi_response:dict# Don't store raw API responseinternal_metadata:dict# Don't store internal data# Good - Only essential data@attrs.defineclassServerState:id:strname:strstatus:strpublic_ip:str
# Only use for truly sensitive dataasyncdef_create_apply(self,ctx):# Public state - faststate=State(id="srv-123",status="running")# Private state - slower due to encryption# Only for sensitive dataprivate={"admin_password":"secret123","internal_token":"tok_abc",}ifctx.config.store_credentialselseNonereturnstate,private
asyncdefread(self,ctx):# Only fetch what you needresource=awaitself.api.get_resource(ctx.state.id,fields=["id","status"])returnState(id=resource.id,status=resource.status)
importasyncioimporttimeasyncdefbenchmark_provider():"""Benchmark provider operations."""provider=MyCloudProvider()awaitprovider.configure({"api_endpoint":"https://api.example.com","api_key":"test_key",})# Benchmark resource creationiterations=100start=time.time()tasks=[create_test_resource(provider,i)foriinrange(iterations)]results=awaitasyncio.gather(*tasks)duration=time.time()-startops_per_second=iterations/durationprint(f"Created {iterations} resources in {duration:.2f}s")print(f"Throughput: {ops_per_second:.2f} ops/sec")asyncdefcreate_test_resource(provider,i):"""Create a test resource."""config=ResourceConfig(name=f"test-{i}")state,_=awaitprovider.create_resource(config)returnstate