Skip to content

Commit

Permalink
Add work with remote host (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
demonolock authored Aug 4, 2023
1 parent 09e9f01 commit 6cb3a80
Show file tree
Hide file tree
Showing 19 changed files with 2,321 additions and 244 deletions.
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,33 @@ with testgres.get_new_node().init() as master:
Note that `default_conf()` is called by `init()` function; both of them overwrite
the configuration file, which means that they should be called before `append_conf()`.

### Remote mode
Testgres supports the creation of PostgreSQL nodes on a remote host. This is useful when you want to run distributed tests involving multiple nodes spread across different machines.

To use this feature, you need to use the RemoteOperations class.
Here is an example of how you might set this up:

```python
from testgres import ConnectionParams, RemoteOperations, TestgresConfig, get_remote_node

# Set up connection params
conn_params = ConnectionParams(
host='your_host', # replace with your host
username='user_name', # replace with your username
ssh_key='path_to_ssh_key' # replace with your SSH key path
)
os_ops = RemoteOperations(conn_params)

# Add remote testgres config before test
TestgresConfig.set_os_ops(os_ops=os_ops)

# Proceed with your test
def test_basic_query(self):
with get_remote_node(conn_params=conn_params) as node:
node.init().start()
res = node.execute('SELECT 1')
self.assertEqual(res, [(1,)])
```

## Authors

Expand Down
7 changes: 5 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
"six>=1.9.0",
"psutil",
"packaging",
"paramiko",
"fabric",
"sshtunnel"
]

# Add compatibility enum class
Expand All @@ -27,9 +30,9 @@
readme = f.read()

setup(
version='1.8.9',
version='1.9.0',
name='testgres',
packages=['testgres'],
packages=['testgres', 'testgres.operations'],
description='Testing utility for PostgreSQL and its extensions',
url='https://github.com/postgrespro/testgres',
long_description=readme,
Expand Down
8 changes: 7 additions & 1 deletion testgres/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .api import get_new_node
from .api import get_new_node, get_remote_node
from .backup import NodeBackup

from .config import \
Expand Down Expand Up @@ -46,8 +46,13 @@
First, \
Any

from .operations.os_ops import OsOperations, ConnectionParams
from .operations.local_ops import LocalOperations
from .operations.remote_ops import RemoteOperations

__all__ = [
"get_new_node",
"get_remote_node",
"NodeBackup",
"TestgresConfig", "configure_testgres", "scoped_config", "push_config", "pop_config",
"NodeConnection", "DatabaseError", "InternalError", "ProgrammingError", "OperationalError",
Expand All @@ -56,4 +61,5 @@
"PostgresNode", "NodeApp",
"reserve_port", "release_port", "bound_ports", "get_bin_path", "get_pg_config", "get_pg_version",
"First", "Any",
"OsOperations", "LocalOperations", "RemoteOperations", "ConnectionParams"
]
12 changes: 12 additions & 0 deletions testgres/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,15 @@ def get_new_node(name=None, base_dir=None, **kwargs):
"""
# NOTE: leave explicit 'name' and 'base_dir' for compatibility
return PostgresNode(name=name, base_dir=base_dir, **kwargs)


def get_remote_node(name=None, conn_params=None):
"""
Simply a wrapper around :class:`.PostgresNode` constructor for remote node.
See :meth:`.PostgresNode.__init__` for details.
For remote connection you can add the next parameter:
conn_params = ConnectionParams(host='127.0.0.1',
ssh_key=None,
username=default_username())
"""
return get_new_node(name=name, conn_params=conn_params)
18 changes: 7 additions & 11 deletions testgres/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

import os

from shutil import rmtree, copytree
from six import raise_from
from tempfile import mkdtemp

from .enums import XLogMethod

Expand All @@ -15,8 +13,6 @@
PG_CONF_FILE, \
BACKUP_LOG_FILE

from .defaults import default_username

from .exceptions import BackupException

from .utils import \
Expand Down Expand Up @@ -47,7 +43,7 @@ def __init__(self,
username: database user name.
xlog_method: none | fetch | stream (see docs)
"""

self.os_ops = node.os_ops
if not node.status():
raise BackupException('Node must be running')

Expand All @@ -60,8 +56,8 @@ def __init__(self,
raise BackupException(msg)

# Set default arguments
username = username or default_username()
base_dir = base_dir or mkdtemp(prefix=TMP_BACKUP)
username = username or self.os_ops.get_user()
base_dir = base_dir or self.os_ops.mkdtemp(prefix=TMP_BACKUP)

# public
self.original_node = node
Expand Down Expand Up @@ -107,14 +103,14 @@ def _prepare_dir(self, destroy):
available = not destroy

if available:
dest_base_dir = mkdtemp(prefix=TMP_NODE)
dest_base_dir = self.os_ops.mkdtemp(prefix=TMP_NODE)

data1 = os.path.join(self.base_dir, DATA_DIR)
data2 = os.path.join(dest_base_dir, DATA_DIR)

try:
# Copy backup to new data dir
copytree(data1, data2)
self.os_ops.copytree(data1, data2)
except Exception as e:
raise_from(BackupException('Failed to copy files'), e)
else:
Expand Down Expand Up @@ -143,7 +139,7 @@ def spawn_primary(self, name=None, destroy=True):

# Build a new PostgresNode
NodeClass = self.original_node.__class__
with clean_on_error(NodeClass(name=name, base_dir=base_dir)) as node:
with clean_on_error(NodeClass(name=name, base_dir=base_dir, conn_params=self.original_node.os_ops.conn_params)) as node:

# New nodes should always remove dir tree
node._should_rm_dirs = True
Expand Down Expand Up @@ -185,4 +181,4 @@ def cleanup(self):

if self._available:
self._available = False
rmtree(self.base_dir, ignore_errors=True)
self.os_ops.rmdirs(self.base_dir, ignore_errors=True)
21 changes: 12 additions & 9 deletions testgres/cache.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# coding: utf-8

import io
import os

from shutil import copytree
from six import raise_from

from .config import testgres_config
Expand All @@ -20,12 +18,16 @@
get_bin_path, \
execute_utility

from .operations.local_ops import LocalOperations
from .operations.os_ops import OsOperations

def cached_initdb(data_dir, logfile=None, params=None):

def cached_initdb(data_dir, logfile=None, params=None, os_ops: OsOperations = LocalOperations()):
"""
Perform initdb or use cached node files.
"""
def call_initdb(initdb_dir, log=None):

def call_initdb(initdb_dir, log=logfile):
try:
_params = [get_bin_path("initdb"), "-D", initdb_dir, "-N"]
execute_utility(_params + (params or []), log)
Expand All @@ -39,22 +41,23 @@ def call_initdb(initdb_dir, log=None):
cached_data_dir = testgres_config.cached_initdb_dir

# Initialize cached initdb
if not os.path.exists(cached_data_dir) or \
not os.listdir(cached_data_dir):

if not os_ops.path_exists(cached_data_dir) or \
not os_ops.listdir(cached_data_dir):
call_initdb(cached_data_dir)

try:
# Copy cached initdb to current data dir
copytree(cached_data_dir, data_dir)
os_ops.copytree(cached_data_dir, data_dir)

# Assign this node a unique system id if asked to
if testgres_config.cached_initdb_unique:
# XXX: write new unique system id to control file
# Some users might rely upon unique system ids, but
# our initdb caching mechanism breaks this contract.
pg_control = os.path.join(data_dir, XLOG_CONTROL_FILE)
with io.open(pg_control, "r+b") as f:
f.write(generate_system_id()) # overwrite id
system_id = generate_system_id()
os_ops.write(pg_control, system_id, truncate=True, binary=True, read_and_write=True)

# XXX: build new WAL segment with our system id
_params = [get_bin_path("pg_resetwal"), "-D", data_dir, "-f"]
Expand Down
17 changes: 13 additions & 4 deletions testgres/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import tempfile

from contextlib import contextmanager
from shutil import rmtree
from tempfile import mkdtemp

from .consts import TMP_CACHE
from .operations.os_ops import OsOperations
from .operations.local_ops import LocalOperations


class GlobalConfig(object):
Expand Down Expand Up @@ -43,6 +43,9 @@ class GlobalConfig(object):

_cached_initdb_dir = None
""" underlying class attribute for cached_initdb_dir property """

os_ops = LocalOperations()
""" OsOperation object that allows work on remote host """
@property
def cached_initdb_dir(self):
""" path to a temp directory for cached initdb. """
Expand All @@ -54,6 +57,7 @@ def cached_initdb_dir(self, value):

if value:
cached_initdb_dirs.add(value)
return testgres_config.cached_initdb_dir

@property
def temp_dir(self):
Expand Down Expand Up @@ -118,6 +122,11 @@ def copy(self):

return copy.copy(self)

@staticmethod
def set_os_ops(os_ops: OsOperations):
testgres_config.os_ops = os_ops
testgres_config.cached_initdb_dir = os_ops.mkdtemp(prefix=TMP_CACHE)


# cached dirs to be removed
cached_initdb_dirs = set()
Expand All @@ -135,7 +144,7 @@ def copy(self):
@atexit.register
def _rm_cached_initdb_dirs():
for d in cached_initdb_dirs:
rmtree(d, ignore_errors=True)
testgres_config.os_ops.rmdirs(d, ignore_errors=True)


def push_config(**options):
Expand Down Expand Up @@ -198,4 +207,4 @@ def configure_testgres(**options):


# NOTE: assign initial cached dir for initdb
testgres_config.cached_initdb_dir = mkdtemp(prefix=TMP_CACHE)
testgres_config.cached_initdb_dir = testgres_config.os_ops.mkdtemp(prefix=TMP_CACHE)
15 changes: 7 additions & 8 deletions testgres/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ def __init__(self,

self._node = node

self._connection = pglib.connect(database=dbname,
user=username,
password=password,
host=node.host,
port=node.port)
self._connection = node.os_ops.db_connect(dbname=dbname,
user=username,
password=password,
host=node.host,
port=node.port)

self._connection.autocommit = autocommit
self._cursor = self.connection.cursor()
Expand Down Expand Up @@ -103,16 +103,15 @@ def rollback(self):

def execute(self, query, *args):
self.cursor.execute(query, args)

try:
res = self.cursor.fetchall()

# pg8000 might return tuples
if isinstance(res, tuple):
res = [tuple(t) for t in res]

return res
except Exception:
except Exception as e:
print("Error executing query: {}".format(e))
return None

def close(self):
Expand Down
9 changes: 4 additions & 5 deletions testgres/defaults.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import datetime
import getpass
import os
import struct
import uuid

from .config import testgres_config as tconf


def default_dbname():
"""
Expand All @@ -17,8 +17,7 @@ def default_username():
"""
Return default username (current user).
"""

return getpass.getuser()
return tconf.os_ops.get_user()


def generate_app_name():
Expand All @@ -44,7 +43,7 @@ def generate_system_id():
system_id = 0
system_id |= (secs << 32)
system_id |= (usecs << 12)
system_id |= (os.getpid() & 0xFFF)
system_id |= (tconf.os_ops.get_pid() & 0xFFF)

# pack ULL in native byte order
return struct.pack('=Q', system_id)
Loading

0 comments on commit 6cb3a80

Please sign in to comment.