Base classes and utilities for creating Terraform data sources (read-only resources).
🤖 AI-Generated Content
This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
Data sources implement a single read(ctx: ResourceContext) method that:
- Reads configuration via ctx.config
- Queries external systems
- Returns data as computed attributes
- Does not modify any state
importattrsfrompyvider.data_sourcesimportregister_data_source,BaseDataSourcefrompyvider.resources.contextimportResourceContextfrompyvider.schemaimports_data_source,a_str,a_num,PvsSchemafrompyvider.exceptionsimportDataSourceError@attrs.defineclassUserConfig:"""Data source lookup configuration."""user_id:int@attrs.defineclassUserData:"""User data returned by data source."""id:intuser_id:intname:stremail:strusername:str@register_data_source("user")classUser(BaseDataSource):""" Looks up user information by ID. This is a read-only data source that fetches user details from an external API. """config_class=UserConfigdata_class=UserData@classmethoddefget_schema(cls)->PvsSchema:"""Define data source schema."""returns_data_source({# Input (required)"user_id":a_num(required=True,description="User ID to lookup",validators=[lambdax:x>0or"User ID must be positive",],),# Computed outputs"id":a_num(computed=True,description="User ID"),"name":a_str(computed=True,description="Full name"),"email":a_str(computed=True,description="Email address"),"username":a_str(computed=True,description="Username"),})asyncdefread(self,ctx:ResourceContext)->UserData|None:"""Fetch user data from API."""ifnotctx.config:returnNonefrompyvider.hubimporthubprovider=hub.get_component("singleton","provider")try:# Fetch from APIuser=awaitprovider.api.get_user(ctx.config.user_id)returnUserData(id=user.id,user_id=user.id,name=user.name,email=user.email,username=user.username,)exceptExceptionase:if"404"instr(e):raiseDataSourceError(f"User {ctx.config.user_id} not found")raise
@attrs.defineclassServerFilterConfig:"""Filter configuration."""region:strstatus:str|None=Nonetags:dict[str,str]|None=None@attrs.defineclassServerListData:"""Filtered server list."""servers:list[dict]count:int@register_data_source("servers")classServers(BaseDataSource):"""Fetch filtered list of servers."""config_class=ServerFilterConfigdata_class=ServerListData@classmethoddefget_schema(cls)->PvsSchema:returns_data_source({# Filter inputs"region":a_str(required=True,description="Filter by region"),"status":a_str(description="Filter by status"),"tags":a_map(a_str(),description="Filter by tags"),# Computed outputs"servers":a_list(a_obj({"id":a_str(),"name":a_str(),"status":a_str(),}),computed=True,description="List of matching servers",),"count":a_num(computed=True,description="Number of servers"),})asyncdefread(self,ctx:ResourceContext)->ServerListData|None:"""Fetch and filter servers."""ifnotctx.config:returnNonefrompyvider.hubimporthubprovider=hub.get_component("singleton","provider")# Build filter criteriafilters={"region":ctx.config.region}ifctx.config.status:filters["status"]=ctx.config.statusifctx.config.tags:filters["tags"]=ctx.config.tags# Fetch filtered serversservers=awaitprovider.api.list_servers(filters)returnServerListData(servers=[{"id":s.id,"name":s.name,"status":s.status,}forsinservers],count=len(servers),)
fromdatetimeimportdatetime,timedelta@register_data_source("region_info")classRegionInfo(BaseDataSource):"""Fetch region information with caching."""def__init__(self):super().__init__()self._cache={}self._cache_expiry={}asyncdefread(self,ctx:ResourceContext)->RegionData|None:"""Fetch region info with caching."""ifnotctx.config:returnNoneregion=ctx.config.regionnow=datetime.now()# Check cacheifregioninself._cache:expiry=self._cache_expiry.get(region)ifexpiryandnow<expiry:returnself._cache[region]# Fetch fresh datafrompyvider.hubimporthubprovider=hub.get_component("singleton","provider")region_data=awaitprovider.api.get_region_info(region)# Cache for 5 minutesdata=RegionData(name=region_data.name,zones=region_data.zones,available=region_data.available,)self._cache[region]=dataself._cache_expiry[region]=now+timedelta(minutes=5)returndata
@register_data_source("image")classImage(BaseDataSource):"""Lookup image with validation."""@classmethoddefget_schema(cls)->PvsSchema:returns_data_source({"image_name":a_str(required=True,validators=[lambdax:len(x)>=3or"Image name too short",lambdax:notx.startswith("_")or"Invalid image name",],),"architecture":a_str(validators=[lambdax:xin["amd64","arm64"]or"Invalid architecture",],),# Computed"id":a_str(computed=True),"version":a_str(computed=True),"created_at":a_str(computed=True),})asyncdefread(self,ctx:ResourceContext)->ImageData|None:"""Fetch image with validation."""ifnotctx.config:returnNonefrompyvider.hubimporthubprovider=hub.get_component("singleton","provider")# Fetch imageimage=awaitprovider.api.get_image(name=ctx.config.image_name,architecture=ctx.config.architectureor"amd64",)ifnotimage:raiseDataSourceError(f"Image '{ctx.config.image_name}' not found")returnImageData(id=image.id,version=image.version,created_at=image.created_at.isoformat(),)
# Use data source to look up userdata"mycloud_user""admin"{user_id=1}# Use data source output in resourceresource"mycloud_server""web"{name="web-server"owner_id=data.mycloud_user.admin.id}# Filter serversdata"mycloud_servers""production"{region="us-east-1"status="running"tags={environment="production"}}output"production_server_count"{value=data.mycloud_servers.production.count}
importpytestfrompyvider.resources.contextimportResourceContext@pytest.mark.asyncioasyncdeftest_user_data_source():"""Test user lookup."""data_source=User()ctx=ResourceContext(config=UserConfig(user_id=1))data=awaitdata_source.read(ctx)assertdataisnotNoneassertdata.user_id==1assertdata.emailassertdata.username@pytest.mark.asyncioasyncdeftest_data_source_not_found():"""Test handling of missing data."""data_source=User()ctx=ResourceContext(config=UserConfig(user_id=99999))withpytest.raises(DataSourceError,match="not found"):awaitdata_source.read(ctx)@pytest.mark.asyncioasyncdeftest_data_source_filtering():"""Test data source with filters."""data_source=Servers()ctx=ResourceContext(config=ServerFilterConfig(region="us-east-1",status="running",tags={"env":"prod"},))data=awaitdata_source.read(ctx)assertdataisnotNoneassertdata.count>0assertall(s["status"]=="running"forsindata.servers)
# Good - Read-onlyasyncdefread(self,ctx):data=awaitprovider.api.get_user(ctx.config.user_id)returnUserData(**data)# Bad - Modifying stateasyncdefread(self,ctx):# Don't create/update/delete in data sources!user=awaitprovider.api.create_user(...)# ❌returnUserData(**user)
# Cache data that changes infrequentlyasyncdefread(self,ctx):cache_key=f"region:{ctx.config.region}"ifcached:=self._get_from_cache(cache_key):returncacheddata=awaitprovider.api.get_region(ctx.config.region)self._cache(cache_key,data,ttl=300)returndata
@classmethoddefget_schema(cls):returns_data_source({"id":a_str(required=True,validators=[lambdax:x.isalnum()or"ID must be alphanumeric",lambdax:len(x)<=64or"ID too long",],),})