Building secure Terraform providers is critical as they often handle sensitive credentials, infrastructure access, and confidential data. This guide covers security best practices for Pyvider providers.
🤖 AI-Generated Content
This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
@register_provider("mycloud")classMyCloudProvider(BaseProvider):asyncdefconfigure(self,config:dict)->None:awaitsuper().configure(config)# API key comes from Terraform configurationself.api_key=config["api_key"]
frompyvider.resourcesimportBaseResourcefrompyvider.resources.private_stateimportPrivateState@register_resource("database")classDatabase(BaseResource):asyncdef_create_apply(self,ctx:ResourceContext)->tuple[State|None,dict|None]:# Create databasedb=awaitself.create_database(ctx.config)# Public state (visible in terraform.tfstate)public_state=State(id=db.id,endpoint=db.endpoint,port=db.port,)# Private state (encrypted, not in terraform.tfstate)private_state={"master_password":db.master_password,"internal_token":db.internal_token,}returnpublic_state,private_state
def_build_schema(self)->PvsSchema:returns_provider({"api_endpoint":a_str(required=True,validators=[lambdax:x.startswith("https://")or"API endpoint must use HTTPS",lambdax:len(x)<2048or"URL too long",]),"port":a_num(validators=[lambdax:1<=x<=65535or"Port must be 1-65535",]),"region":a_str(validators=[lambdax:xin["us-east-1","us-west-2","eu-central-1"]or"Invalid region",]),})
importredefvalidate_resource_name(name:str)->str|bool:"""Validate resource name is safe."""# Only allow alphanumeric, hyphens, underscoresifnotre.match(r'^[a-zA-Z0-9_-]+$',name):return"Name must contain only letters, numbers, hyphens, and underscores"# Prevent overly long namesiflen(name)>255:return"Name must be 255 characters or less"returnTrue@classmethoddefget_schema(cls)->PvsSchema:returns_resource({"name":a_str(required=True,validators=[validate_resource_name]),})
frompathlibimportPathasyncdef_create_apply(self,ctx:ResourceContext):# Get user-provided pathrequested_path=Path(ctx.config.path)# Prevent path traversal attacksif".."instr(requested_path):raiseValueError("Path cannot contain '..'")# Resolve to absolute pathabs_path=requested_path.resolve()# Ensure it's within allowed directoryallowed_base=Path("/var/data").resolve()ifnotstr(abs_path).startswith(str(allowed_base)):raiseValueError(f"Path must be within {allowed_base}")# Now safe to useabs_path.write_text(ctx.config.content)
def_build_schema(self)->PvsSchema:returns_resource({"content":a_str(required=True,validators=[# Max 1MB of contentlambdax:len(x)<=1_000_000or"Content too large (max 1MB)",]),"tags":a_list(a_str(),validators=[# Max 100 tagslambdax:len(x)<=100or"Too many tags (max 100)",]),})
# Private state is automatically encrypted by Pyviderasyncdef_create_apply(self,ctx):credentials=awaitself.generate_credentials()state=State(id="resource-123",endpoint="https://api.example.com")# Store credentials in encrypted private stateprivate={"access_key":credentials.access_key,"secret_key":credentials.secret_key,}returnstate,private
# Bad - Secret in statestate=State(id="db-123",password="supersecret"# Visible in terraform.tfstate)# Good - Reference to secret managerstate=State(id="db-123",password_secret_id="arn:aws:secretsmanager:..."# Reference only)
asyncdefread(self,ctx:ResourceContext)->State|None:ifnotctx.state:returnNone# Verify the resource actually existstry:current=awaitself.api.get_resource(ctx.state.id)exceptResourceNotFoundError:logger.warning("Resource not found during read - may have been deleted outside Terraform",resource_id=ctx.state.id)returnNone# Verify critical attributes matchifcurrent.type!=ctx.state.type:logger.error("Resource type mismatch - possible state corruption",expected=ctx.state.type,actual=current.type)raiseStateCorruptionError("Resource type has changed")returnState(...)
importhttpx@register_provider("mycloud")classMyCloudProvider(BaseProvider):asyncdefconfigure(self,config:dict)->None:awaitsuper().configure(config)# Use OAuth 2.0 with token refreshself.oauth_client=OAuth2Client(client_id=config["client_id"],client_secret=config["client_secret"],token_url=config["token_url"],)# Get initial tokenself.access_token=awaitself.oauth_client.get_token()asyncdef_api_request(self,method:str,path:str,**kwargs):"""Make authenticated API request with automatic token refresh."""headers=kwargs.pop("headers",{})headers["Authorization"]=f"Bearer {self.access_token}"try:response=awaitself.http_client.request(method,path,headers=headers,**kwargs)response.raise_for_status()returnresponseexcepthttpx.HTTPStatusErrorase:ife.response.status_code==401:# Token expired - refresh and retryself.access_token=awaitself.oauth_client.refresh_token()headers["Authorization"]=f"Bearer {self.access_token}"response=awaitself.http_client.request(method,path,headers=headers,**kwargs)response.raise_for_status()returnresponseraise
asyncdefconfigure(self,config:dict)->None:awaitsuper().configure(config)# Default: Verify SSL certificatesverify_ssl=config.get("verify_ssl",True)ifnotverify_ssl:logger.warning("SSL verification disabled - this is insecure and should only be ""used in development environments")self.http_client=httpx.AsyncClient(verify=verify_ssl,# Verify by defaulttimeout=30.0,)
importhashlibimporthmacfromdatetimeimportdatetimedefsign_request(method:str,url:str,secret_key:str,timestamp:datetime|None=None)->str:"""Sign API request with HMAC-SHA256."""iftimestampisNone:timestamp=datetime.utcnow()# Create canonical requestcanonical=f"{method}\n{url}\n{timestamp.isoformat()}"# Sign with HMAC-SHA256signature=hmac.new(secret_key.encode(),canonical.encode(),hashlib.sha256).hexdigest()returnsignature
fromprovide.foundationimportlogger# Bad - Logs sensitive datalogger.info("Authenticating",api_key=api_key)# NEVER# Good - Logs safelylogger.info("Authenticating",api_key_length=len(api_key))logger.info("Authenticating")# No sensitive data
defredact_sensitive(data:dict)->dict:"""Redact sensitive fields from data before logging."""sensitive_keys={"password","api_key","secret","token","credential"}redacted={}forkey,valueindata.items():ifany(sensitiveinkey.lower()forsensitiveinsensitive_keys):redacted[key]="***REDACTED***"elifisinstance(value,dict):redacted[key]=redact_sensitive(value)else:redacted[key]=valuereturnredacted# Usagelogger.debug("API response",data=redact_sensitive(response_data))
asyncdefconfigure(self,config:dict)->None:try:awaitsuper().configure(config)# Validate API keyis_valid=awaitself.validate_api_key(config["api_key"])ifnotis_valid:logger.warning("Authentication failed - invalid API key",api_endpoint=config["api_endpoint"],)raiseProviderConfigurationError("Invalid API key")logger.info("Authentication successful",api_endpoint=config["api_endpoint"],)exceptExceptionase:logger.error("Provider configuration failed",error=str(e),error_type=type(e).__name__,)raise
# Bad - Importing entire library for one functionimportpandasaspddata=pd.DataFrame(...)# Good - Use stdlib when possibleimportcsvdata=list(csv.DictReader(...))
def_build_schema(self)->PvsSchema:returns_provider({"api_endpoint":a_str(required=True,validators=[lambdax:x.startswith("https://")or"API endpoint must use HTTPS",]),})
fromdatetimeimportdatetime,timedeltaimportasyncioclassRateLimiter:def__init__(self,max_requests:int,time_window:timedelta):self.max_requests=max_requestsself.time_window=time_windowself.requests=[]self.lock=asyncio.Lock()asyncdefacquire(self):"""Wait if rate limit exceeded."""asyncwithself.lock:now=datetime.now()# Remove old requestsself.requests=[req_timeforreq_timeinself.requestsifnow-req_time<self.time_window]# Wait if limit reachediflen(self.requests)>=self.max_requests:oldest=min(self.requests)wait_time=(oldest+self.time_window-now).total_seconds()ifwait_time>0:logger.debug("Rate limit reached, waiting",wait_seconds=wait_time)awaitasyncio.sleep(wait_time)self.requests.append(now)