Not sure how to delete this previous message but m...
# community-help
s
Not sure how to delete this previous message but my problem is solved now I believe it was down to permissions. In case it helps anyone else this is the script I've created to sync typesense data -> a cloudflare r2 bucket #!/usr/bin/env python3 import os import sys import requests import boto3 import shutil import subprocess import logging import time from datetime import datetime from pathlib import Path from dotenv import load_dotenv load_dotenv() # Retrieve environment variables TYPESENSE_HOST = os.getenv('TYPESENSE_HOST', 'http://localhost:8108') TYPESENSE_API_KEY = os.getenv('TYPESENSE_API_KEY') R2_ENDPOINT = os.getenv('R2_ENDPOINT') R2_ACCESS_KEY = os.getenv('R2_ACCESS_KEY') R2_SECRET_KEY = os.getenv('R2_SECRET_KEY') R2_BUCKET = os.getenv('R2_BUCKET') # Directory to store the tar archive (host path) BACKUP_LOCATION = os.getenv('BACKUP_LOCATION') # Dedicated backups directory (host path) for snapshot data HOST_SNAPSHOT_DIR = os.getenv('HOST_SNAPSHOT_DIR', '/home/user/services/typesense/backups') # Container path for snapshot data (Typesense will write snapshots here) CONTAINER_SNAPSHOT_DIR = os.getenv('CONTAINER_SNAPSHOT_DIR', '/backups') REQUIRED_ENV_VARS = [ 'TYPESENSE_API_KEY', 'R2_ENDPOINT', 'R2_ACCESS_KEY', 'R2_SECRET_KEY', 'R2_BUCKET' ] missing_vars = [var for var in REQUIRED_ENV_VARS if not os.getenv(var)] if missing_vars: print(f"Error: Missing required environment variables: {', '.join(missing_vars)}") sys.exit(1) # Determine current directory and backup location (host path) current_dir = Path(file).parent backup_location = Path(BACKUP_LOCATION) if BACKUP_LOCATION else current_dir backup_location.mkdir(parents=True, exist_ok=True) # Ensure the host snapshot (backups) directory exists host_snapshot_dir = Path(HOST_SNAPSHOT_DIR) host_snapshot_dir.mkdir(parents=True, exist_ok=True) # Set up logging log_dir = current_dir / 'logs' log_dir.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_dir / 'backup.log'), logging.StreamHandler() ] ) logger = logging.getLogger(name) def get_snapshot_directory(): """ Use the HOST_SNAPSHOT_DIR as the snapshot directory. Clear its contents before use (ensure this directory is dedicated for snapshots). """ snapshot_dir = host_snapshot_dir if snapshot_dir.exists(): shutil.rmtree(snapshot_dir) snapshot_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Using host snapshot directory: {snapshot_dir}") return snapshot_dir def create_snapshot(): try: headers = { 'Content-Type': 'application/json', 'X-TYPESENSE-API-KEY': TYPESENSE_API_KEY } # Use the host snapshot (backups) directory snapshot_dir = get_snapshot_directory() # Call the snapshot API with the container path params = { 'snapshot_path': CONTAINER_SNAPSHOT_DIR } logger.info(f"Calling snapshot API with snapshot_path={CONTAINER_SNAPSHOT_DIR}") response = requests.post( f"{TYPESENSE_HOST}/operations/snapshot", headers=headers, params=params ) response.raise_for_status() if not response.json().get('success'): raise Exception("Snapshot creation failed") logger.info("Snapshot API returned success. Waiting for snapshot data to be written...") # Increased timeout to 180 seconds in case snapshot takes longer. timeout = 180 # seconds poll_interval = 2 # seconds elapsed = 0 # First, wait for the expected subdirectory ('state') to appear. state_dir = snapshot_dir / 'state' while not state_dir.exists() and elapsed < timeout: logger.info("Waiting for 'state' subdirectory to appear in snapshot directory...") time.sleep(poll_interval) elapsed += poll_interval if not state_dir.exists(): raise Exception("Timeout waiting for 'state' directory to be created.") logger.info(f"'state' subdirectory detected at {state_dir}. Beginning to poll for snapshot data completeness...") previous_size = -1 stable_iterations = 0 # Reset elapsed for data polling stability check. elapsed = 0 while elapsed < timeout: files = list(state_dir.rglob("*")) total_size = sum(f.stat().st_size for f in files if f.is_file()) logger.info(f"Polling 'state' directory: found {len(files)} file(s), total size: {total_size} bytes.") if total_size > 1024: # Expect at least 1KB of data (adjust threshold if necessary) if total_size == previous_size: stable_iterations += 1 logger.info(f"Total size stable for {stable_iterations} iteration(s).") else: stable_iterations = 0 if stable_iterations >= 2: # Data size is stable across two consecutive polls. logger.info(f"Snapshot data appears complete and stable (total size: {total_size} bytes).") break previous_size = total_size time.sleep(poll_interval) elapsed += poll_interval else: raise Exception("Timeout waiting for snapshot data to be fully written.") # Prepare the backup file path outside the snapshot directory. timestamp = datetime.now().strftime('%Y%m%d-%H%M%S') backup_filename = f"typesense-backup-{timestamp}.tar.gz" backup_file = backup_location / backup_filename logger.info(f"Creating tar archive: {backup_file}") subprocess.run([ 'tar', '-czf', str(backup_file), '-C', str(snapshot_dir), '.' ], check=True) return backup_file, snapshot_dir except requests.RequestException as e: logger.error(f"HTTP request failed: {str(e)}") raise except subprocess.CalledProcessError as e: logger.error(f"Tar command failed: {str(e)}") raise except Exception as e: logger.error(f"Unexpected error during snapshot: {str(e)}") raise def upload_to_r2(file_path): try: logger.info("Initialising R2 client") s3_client = boto3.client('s3', endpoint_url=R2_ENDPOINT, aws_access_key_id=R2_ACCESS_KEY, aws_secret_access_key=R2_SECRET_KEY, region_name='auto', config=boto3.session.Config( signature_version='s3v4', retries={'max_attempts': 3}, ) ) filename = file_path.name logger.info(f"Starting upload of {filename} to R2") s3_client.upload_file(str(file_path), R2_BUCKET, filename) logger.info(f"Successfully uploaded {filename} to R2") except Exception as e: logger.error(f"Upload failed: {str(e)}") raise def cleanup(backup_file, snapshot_dir): try: if backup_file and backup_file.exists(): backup_file.unlink() logger.info(f"Cleaned up backup file: {backup_file}") if snapshot_dir and snapshot_dir.exists(): shutil.rmtree(snapshot_dir) logger.info(f"Cleaned up snapshot directory: {snapshot_dir}") except Exception as e: logger.error(f"Cleanup failed: {str(e)}") def main(): backup_file = None snapshot_dir = None try: logger.info("Starting backup process") backup_file, snapshot_dir = create_snapshot() upload_to_r2(backup_file) logger.info("Backup completed successfully") except Exception as e: logger.error(f"Backup failed: {str(e)}") sys.exit(1) finally: if backup_file or snapshot_dir: cleanup(backup_file, snapshot_dir) if name == "__main__": main()