Not sure how to delete this previous message but my problem is solved now I believe it was down to permissions.
In case it helps anyone else
this is the script I've created to sync typesense data -> a cloudflare r2 bucket
#!/usr/bin/env python3
import os
import sys
import requests
import boto3
import shutil
import subprocess
import logging
import time
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
# Retrieve environment variables
TYPESENSE_HOST = os.getenv('TYPESENSE_HOST', '
http://localhost:8108')
TYPESENSE_API_KEY = os.getenv('TYPESENSE_API_KEY')
R2_ENDPOINT = os.getenv('R2_ENDPOINT')
R2_ACCESS_KEY = os.getenv('R2_ACCESS_KEY')
R2_SECRET_KEY = os.getenv('R2_SECRET_KEY')
R2_BUCKET = os.getenv('R2_BUCKET')
# Directory to store the tar archive (host path)
BACKUP_LOCATION = os.getenv('BACKUP_LOCATION')
# Dedicated backups directory (host path) for snapshot data
HOST_SNAPSHOT_DIR = os.getenv('HOST_SNAPSHOT_DIR', '/home/user/services/typesense/backups')
# Container path for snapshot data (Typesense will write snapshots here)
CONTAINER_SNAPSHOT_DIR = os.getenv('CONTAINER_SNAPSHOT_DIR', '/backups')
REQUIRED_ENV_VARS = [
'TYPESENSE_API_KEY',
'R2_ENDPOINT',
'R2_ACCESS_KEY',
'R2_SECRET_KEY',
'R2_BUCKET'
]
missing_vars = [var for var in REQUIRED_ENV_VARS if not os.getenv(var)]
if missing_vars:
print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
sys.exit(1)
# Determine current directory and backup location (host path)
current_dir = Path(
file).parent
backup_location = Path(BACKUP_LOCATION) if BACKUP_LOCATION else current_dir
backup_location.mkdir(parents=True, exist_ok=True)
# Ensure the host snapshot (backups) directory exists
host_snapshot_dir = Path(HOST_SNAPSHOT_DIR)
host_snapshot_dir.mkdir(parents=True, exist_ok=True)
# Set up logging
log_dir = current_dir / 'logs'
log_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=
logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_dir / 'backup.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(
name)
def get_snapshot_directory():
"""
Use the HOST_SNAPSHOT_DIR as the snapshot directory.
Clear its contents before use (ensure this directory is dedicated for snapshots).
"""
snapshot_dir = host_snapshot_dir
if snapshot_dir.exists():
shutil.rmtree(snapshot_dir)
snapshot_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Using host snapshot directory: {snapshot_dir}")
return snapshot_dir
def create_snapshot():
try:
headers = {
'Content-Type': 'application/json',
'X-TYPESENSE-API-KEY': TYPESENSE_API_KEY
}
# Use the host snapshot (backups) directory
snapshot_dir = get_snapshot_directory()
# Call the snapshot API with the container path
params = {
'snapshot_path': CONTAINER_SNAPSHOT_DIR
}
logger.info(f"Calling snapshot API with snapshot_path={CONTAINER_SNAPSHOT_DIR}")
response = requests.post(
f"{TYPESENSE_HOST}/operations/snapshot",
headers=headers,
params=params
)
response.raise_for_status()
if not response.json().get('success'):
raise Exception("Snapshot creation failed")
logger.info("Snapshot API returned success. Waiting for snapshot data to be written...")
# Increased timeout to 180 seconds in case snapshot takes longer.
timeout = 180 # seconds
poll_interval = 2 # seconds
elapsed = 0
# First, wait for the expected subdirectory ('state') to appear.
state_dir = snapshot_dir / 'state'
while not state_dir.exists() and elapsed < timeout:
logger.info("Waiting for 'state' subdirectory to appear in snapshot directory...")
time.sleep(poll_interval)
elapsed += poll_interval
if not state_dir.exists():
raise Exception("Timeout waiting for 'state' directory to be created.")
logger.info(f"'state' subdirectory detected at {state_dir}. Beginning to poll for snapshot data completeness...")
previous_size = -1
stable_iterations = 0
# Reset elapsed for data polling stability check.
elapsed = 0
while elapsed < timeout:
files = list(state_dir.rglob("*"))
total_size = sum(f.stat().st_size for f in files if f.is_file())
logger.info(f"Polling 'state' directory: found {len(files)} file(s), total size: {total_size} bytes.")
if total_size > 1024: # Expect at least 1KB of data (adjust threshold if necessary)
if total_size == previous_size:
stable_iterations += 1
logger.info(f"Total size stable for {stable_iterations} iteration(s).")
else:
stable_iterations = 0
if stable_iterations >= 2: # Data size is stable across two consecutive polls.
logger.info(f"Snapshot data appears complete and stable (total size: {total_size} bytes).")
break
previous_size = total_size
time.sleep(poll_interval)
elapsed += poll_interval
else:
raise Exception("Timeout waiting for snapshot data to be fully written.")
# Prepare the backup file path outside the snapshot directory.
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
backup_filename = f"typesense-backup-{timestamp}.tar.gz"
backup_file = backup_location / backup_filename
logger.info(f"Creating tar archive: {backup_file}")
subprocess.run([
'tar', '-czf', str(backup_file),
'-C', str(snapshot_dir), '.'
], check=True)
return backup_file, snapshot_dir
except requests.RequestException as e:
logger.error(f"HTTP request failed: {str(e)}")
raise
except subprocess.CalledProcessError as e:
logger.error(f"Tar command failed: {str(e)}")
raise
except Exception as e:
logger.error(f"Unexpected error during snapshot: {str(e)}")
raise
def upload_to_r2(file_path):
try:
logger.info("Initialising R2 client")
s3_client = boto3.client('s3',
endpoint_url=R2_ENDPOINT,
aws_access_key_id=R2_ACCESS_KEY,
aws_secret_access_key=R2_SECRET_KEY,
region_name='auto',
config=boto3.session.Config(
signature_version='s3v4',
retries={'max_attempts': 3},
)
)
filename = file_path.name
logger.info(f"Starting upload of {filename} to R2")
s3_client.upload_file(str(file_path), R2_BUCKET, filename)
logger.info(f"Successfully uploaded {filename} to R2")
except Exception as e:
logger.error(f"Upload failed: {str(e)}")
raise
def cleanup(backup_file, snapshot_dir):
try:
if backup_file and backup_file.exists():
backup_file.unlink()
logger.info(f"Cleaned up backup file: {backup_file}")
if snapshot_dir and snapshot_dir.exists():
shutil.rmtree(snapshot_dir)
logger.info(f"Cleaned up snapshot directory: {snapshot_dir}")
except Exception as e:
logger.error(f"Cleanup failed: {str(e)}")
def main():
backup_file = None
snapshot_dir = None
try:
logger.info("Starting backup process")
backup_file, snapshot_dir = create_snapshot()
upload_to_r2(backup_file)
logger.info("Backup completed successfully")
except Exception as e:
logger.error(f"Backup failed: {str(e)}")
sys.exit(1)
finally:
if backup_file or snapshot_dir:
cleanup(backup_file, snapshot_dir)
if
name == "__main__":
main()