from pyqchem.qc_input import QchemInput
from pyqchem.errors import ParserError, OutputError
from pyqchem.utils import get_sdm
from subprocess import Popen, PIPE
from pathlib import Path
import os, shutil, sys
import numpy as np
import hashlib
import pickle
import warnings
if sys.version_info[0] < 3 or sys.platform in ["win32", "cygwin"] or os.getenv('PYQCHEM_CACHE') == '1':
# For python 2.x or Windows use pickle based cache system
from pyqchem.cache import SimpleCache as CacheSystem
warnings.warn('Using SimpleCache')
else:
# For python 3.x use SQL Database based cache system
from pyqchem.cache import SqlCache as CacheSystem
# Backwards Compatibility
def redefine_calculation_data_filename(filename, compress=False):
cache = CacheSystem(filename=filename, compress=compress)
# Check if calculation finished ok
def finish_ok(output):
return output[-1000:].find('Thank you very much for using Q-Chem') != -1
# Return qchem version
def get_version_output(output):
class QChemVersion:
def __init__(self, string):
string_version = string.split()[1]
self._major = string_version.split('.')[0]
self._minor = string_version.split('.')[1]
string_branch = string.split()[2]
self._devel = True if '(devel)' in string_branch else False
def __str__(self):
dev = 'dev' if self.is_development else ''
return '{}.{} {}'.format(self.major, self.minor, dev)
def __eq__(self, other):
"""
Here put the logic for more sophisticated comparison
between versions
:param other: string/QchemVersion
:return:
"""
if isinstance(other, QChemVersion):
return True if self.__str__() == other.__str__() else False
o_major = other.split('.')[0]
o_minor = other.split('.')[1]
if int(o_major) == self.major:
# handle expresions like 2.3+
if '+' in o_minor[-1]:
if self.minor >= int(o_minor[:-1]):
return True
else:
return False
if int(o_minor) == self.minor:
return True
return False
@property
def major(self):
return int(self._major)
@property
def minor(self):
return int(self._minor)
@property
def is_development(self):
return self._devel
index = output[:500].find('\n Q-Chem')
string = output[index: index + 30]
return QChemVersion(string)
def get_compatibility_list_from_parser(parser):
docstring = parser.__doc__
if docstring is None:
return None
lines = docstring.split('\n')
for line in lines:
if 'compatibility' in line.lower():
try:
return [version.strip() for version in line.split(':')[1].split(',')]
except IndexError:
continue
return None
# Layer of compatibility with old version
def create_qchem_input(*args, **kwargs):
return QchemInput(*args, **kwargs)
[docs]def parse_output(get_output_function):
"""
to be deprecated
:param get_output_function:
:return: parsed output
"""
cache = CacheSystem()
def func_wrapper(*args, **kwargs):
parser = kwargs.pop('parser', None)
parser_parameters = kwargs.pop('parser_parameters', {})
store_output = kwargs.pop('store_output', None)
force_recalculation = kwargs.pop('force_recalculation', False)
if parser is not None:
hash_p = (args[0], parser.__name__)
if hash_p in cache.calculation_data and not force_recalculation:
print('already calculated. Skip')
return cache.calculation_data[hash_p]
output, err = get_output_function(*args, **kwargs)
if store_output is not None:
with open('{}'.format(store_output), 'w') as f:
f.write(output)
if len(err) > 0:
print(output[-800:])
print(err)
raise Exception('q-chem calculation finished with error')
if parser is None:
return output
parsed_output = parser(output, **parser_parameters)
cache.calculation_data[hash_p] = parsed_output
with open(cache._calculation_data_filename, 'wb') as output:
pickle.dump(cache.calculation_data, output, protocol=pickle.DEFAULT_PROTOCOL)
return parsed_output
return func_wrapper
[docs]def local_run(input_file_name, work_dir, fchk_file, use_mpi=False, processors=1):
"""
Run Q-Chem locally
:param input_file_name: Q-Chem input file in plain text format
:param work_dir: Scratch directory where calculation run
:param fchk_file: filename of fchk
:param use_mpi: use mpi instead of openmp
:return: output, err: Q-Chem standard output and standard error
"""
if not use_mpi:
os.environ["QCTHREADS"] = "{}".format(processors)
os.environ["OMP_NUM_THREADS"] = "{}".format(processors)
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["GUIFILE"] = fchk_file
qc_dir = os.getenv('QC')
exe_dir = os.getenv('QC_EXE_DIR') if 'QC_EXE_DIR' in os.environ else 'exe'
binary = Path(qc_dir).joinpath(exe_dir).joinpath('qcprog.exe')
command = [binary, Path(work_dir).joinpath(input_file_name), Path(work_dir)]
qchem_process = Popen(command, stdout=PIPE, stdin=PIPE, stderr=PIPE, cwd=work_dir)
(output, err) = qchem_process.communicate()
qchem_process.wait()
output = output.decode(errors='ignore')
err = err.decode()
return output, err
[docs]def local_run_stream(input_file_name, work_dir, fchk_file, use_mpi=False, processors=1, print_stream=True):
"""
Run Q-Chem locally
:param input_file_name: Q-Chem input file in plain text format
:param work_dir: Scratch directory where calculation run
:param fchk_file: filename of fchk
:param use_mpi: use mpi instead of openmp
:param print_stream: set True to print output stream during execution
:return: output, err: Q-Chem standard output and standard error
"""
if not use_mpi:
os.environ["QCTHREADS"] = "{}".format(processors)
os.environ["OMP_NUM_THREADS"] = "{}".format(processors)
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["GUIFILE"] = fchk_file
qc_dir = os.getenv('QC')
exe_dir = os.getenv('QC_EXE_DIR') if 'QC_EXE_DIR' in os.environ else 'exe'
binary = Path(qc_dir).joinpath(exe_dir).joinpath('qcprog.exe')
command = [binary, Path(work_dir).joinpath(input_file_name), Path(work_dir)]
qchem_process = Popen(command, stdout=PIPE, stdin=PIPE, stderr=PIPE, cwd=work_dir)
output = ''
err = ''
while True:
line_out = qchem_process.stdout.readline()
line_err = qchem_process.stderr.readline()
if not line_out and not line_err:
break
if print_stream:
print(line_out.strip().decode(errors='ignore'))
sys.stdout.flush()
output += line_out.decode(errors='ignore')
err += line_err.decode(errors='ignore')
return output, err
[docs]def remote_run(input_file_name, work_dir, fchk_file, remote_params, use_mpi=False, processors=1):
"""
Run Q-Chem remotely
:param input_file: Q-Chem input file in plain text format
:param work_dir: Scratch directory where calculation run
:param fchk_file: filename of fchk
:param remote_params: connection parameters for paramiko
:param use_mpi: use mpi instead of openmp
:return: output, err: Q-Chem standard output and standard error
"""
import paramiko
# get precommands
commands = remote_params.pop('precommand', [])
remote_scratch = remote_params.pop('remote_scratch', None)
# Setup SSH connection
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(**remote_params)
ssh.get_transport()
sftp = ssh.open_sftp()
print('connected to {}..'.format(remote_params['hostname']))
# Define temp remote dir
_, stdout, _ = ssh.exec_command('pwd', get_pty=True)
if remote_scratch is None:
remote_scratch = stdout.read().decode().strip('\n').strip('\r')
remote_dir = os.path.join(remote_scratch, 'temp_pyqchem_remote')
# Create temp directory in remote machine
try:
sftp.mkdir(remote_dir)
except OSError:
pass
sftp.chdir(remote_dir)
# Copy all files in local workdir to remote machine
file_list = os.listdir(work_dir)
for file in file_list:
sftp.put(os.path.join(work_dir, file), '{}'.format(file))
flag = '-np' if use_mpi else '-nt'
# Define commands to run Q-Chem in remote machine
commands += ['cd {}'.format(remote_dir), # go to remote work dir
'qchem {} {} {}'.format(flag, processors, input_file_name)] # run qchem
# Execute command in remote machine
stdin, stdout, stderr = ssh.exec_command('bash -l -c "{}"'.format(';'.join(commands)), get_pty=True)
# Reformat output/error files
output = ''.join(stdout.readlines())
error = ''.join(stderr.readlines())
# get files and remove them from remote server
for file in sftp.listdir():
sftp.get(os.path.join(remote_dir, file), os.path.join(work_dir, file))
sftp.remove(os.path.join(remote_dir, file))
sftp.rmdir(remote_dir)
sftp.close()
ssh.close()
# Rename fchk file to match expected name
if input_file_name + '.fchk' in os.listdir(work_dir):
os.rename(os.path.join(work_dir, input_file_name + '.fchk'), os.path.join(work_dir, fchk_file))
return output, error
[docs]def generate_additional_files(input_qchem, work_dir):
"""
Generate additional files on scratch (work dir) for special calculations
:param input_qchem: QChem input object
:param work_dir: scratch directory
"""
# handle custom guess
if input_qchem.mo_coefficients is not None:
input_qchem.store_mo_file(work_dir)
if input_qchem.scf_density is not None:
input_qchem.store_density_file(work_dir)
# set scf energy if skip_scfman (to not break)
# TODO: now SCF energy is set to zero. This may not work for all features.
if input_qchem._skip_scfman:
if input_qchem.mo_coefficients is None:
raise Exception('Explicit MO guess has to be provided for scf_skip')
input_qchem.store_energy_file(work_dir)
# Write hessian
if input_qchem.hessian is not None:
input_qchem.store_hessian_file(work_dir)
# write RAS-GUESS
if input_qchem.ras_guess is not None:
input_qchem.store_ras_guess_file(work_dir)
[docs]def retrieve_additional_files(input_qchem, data_fchk, work_dir, scratch_read_level=0):
"""
retrieve data from files in scratch data (on development, currently for test only)
:param input_qchem: QChem input object
:param data_fchk: FCHK parsed dictionary
:param work_dir: scratch directory
:param scratch_read_level: defines what data to retrieve
:return: dictionary with additional data
"""
additional_data = {}
natom = len(input_qchem.molecule.get_coordinates())
file_list = os.listdir(work_dir)
# OLD_DIMENSIONS
if '819.0' in file_list:
with open(work_dir + '819.0', 'r') as f:
data = np.fromfile(f, dtype=np.int32)
norb_alpha, norb_beta = data[0:2]
norb = norb_alpha
nbas = norb # assumption
else:
norb = np.shape(data_fchk['coefficients']['alpha'])[0]
nbas = np.shape(data_fchk['coefficients']['alpha'])[1]
# MO_COEFS (Already in fchk) in internal order
if '53.0' in file_list and 'coefficients' in data_fchk:
with open(work_dir + '53.0', 'r') as f:
data = np.fromfile(f, dtype=float)
mo_alpha = data[:norb*nbas].reshape(-1, norb).tolist()
mo_beta = data[norb*nbas: 2*norb_beta*nbas].reshape(-1, norb_beta).tolist()
# additional_data['coefficients_internal'] = {'alpha': mo_alpha, 'beta': mo_beta}
# obtain the order indices between fchk order and Q-Chem internal order of basis functions
diff_square = get_sdm(data_fchk['coefficients']['alpha'], mo_alpha)
# get non-repeating indices
indices = []
for row in diff_square.T:
for i in np.argsort(row):
if i not in indices:
indices.append(int(i))
break
# indices = np.argmin(diff_square, axis=0).tolist()
# store q-chem index order for later use (e.g guess)
data_fchk['coefficients']['qchem_order'] = indices
else:
indices = list(range(nbas))
# FOCK_MATRIX
if '58.0' in file_list:
with open(work_dir + '58.0', 'r') as f:
data = np.fromfile(f, dtype=float)
fock_alpha = data[:nbas*nbas].reshape(-1, nbas)
fock_beta = data[nbas*nbas: 2*nbas*nbas].reshape(-1, nbas)
# set basis functions in fchk order
fock_alpha = fock_alpha[:, indices]
fock_alpha = fock_alpha[indices, :]
fock_beta = fock_beta[:, indices]
fock_beta = fock_beta[indices, :]
additional_data['fock_matrix'] = {'alpha': fock_alpha.tolist(), 'beta': fock_beta.tolist()}
if scratch_read_level == -1:
# FILE_ENERGY (Not really worth to read it)
if '99.0' in file_list:
with open(work_dir + '99.0', 'r') as f:
data = np.fromfile(f, dtype=float)
# FILE_DENSITY_MATRIX (Already in fchk)
if '54.0' in file_list:
with open(work_dir + '54.0', 'r') as f:
data = np.fromfile(f, dtype=float)
density_alpha = data[:nbas*nbas].reshape(-1, nbas)
density_beta = data[nbas*nbas: 2*nbas*nbas].reshape(-1, nbas)
# set basis functions in fchk order
density_alpha = density_alpha[:, indices]
density_alpha = density_alpha[indices, :]
density_beta = density_beta[:, indices]
density_beta = density_beta[indices, :]
additional_data['scf_density_internal'] = {'alpha': density_alpha.tolist(), 'beta': density_beta.tolist()}
# HESSIAN_MATRIX
if '132.0' in file_list:
with open(work_dir + '132.0', 'r') as f:
data = np.fromfile(f, dtype=float)
hessian = data.reshape(-1, natom*3)
additional_data['hessian'] = hessian.tolist()
# AO_INTS_DEBUG
if '21.0' in file_list:
with open(work_dir + '21.0', 'r') as f:
data = np.fromfile(f, dtype=float)
ao_integrals = data.reshape(-1, nbas, nbas, nbas)
# set basis functions in fchk order
ao_integrals = ao_integrals[:, :, :, indices]
ao_integrals = ao_integrals[:, :, indices, :]
ao_integrals = ao_integrals[:, indices, :, :]
ao_integrals = ao_integrals[indices, :, :, :]
additional_data['ao_integrals'] = ao_integrals.tolist()
if scratch_read_level > 0:
# FILE_RAS_AMP
if '704.0' in file_list:
with open(work_dir + '705.0', 'r') as f:
ras_energies = np.fromfile(f, dtype=float)
n_ras_roots = len(ras_energies)
with open(work_dir + '704.0', 'r') as f:
data = np.fromfile(f, dtype=float)
ras_amplitudes = data.reshape(n_ras_roots, -1)
additional_data['ras_amplitudes'] = ras_amplitudes.tolist()
return additional_data
[docs]def get_output_from_qchem(input_qchem,
processors=1,
use_mpi=False,
scratch=None,
read_fchk=False, # to be deprecated
return_electronic_structure=False,
parser=None,
parser_parameters=None,
force_recalculation=False,
fchk_only=False,
store_full_output=False,
delete_scratch=True,
remote=None,
scratch_read_level=0):
"""
Runs qchem and returns the output in the following format:
1) If return_electronic_structure is requested:
[output, parsed_fchk]
2) If return_electronic_structure is not requested:
[output]
Note: if parser is set then output contains a dictionary with the parsed info
else output contains the q-chem output in plain text
:param input_qchem: QcInput object containing the Q-Chem input
:param processors: number of threads/processors to use in the calculation
:param use_mpi: If False use OpenMP (threads) else use MPI (processors)
:param scratch: Full Q-Chem scratch directory path. If None read from $QCSCRATCH
:param return_electronic_structure: if True, returns the parsed FCHK file containing the electronic structure
:param read_fchk: same as return_electronic_structure (to be deprecated)
:param parser: function to use to parse the Q-Chem output
:param parser_parameters: additional parameters that parser function may have
:param force_recalculation: Force to recalculate even identical calculation has already performed
:param fchk_only: If true, returns electronic structure data from cache ignoring output (to be deprecated)
:param remote: dictionary containing the data for remote calculation (beta)
:param store_full_output: store full output in plain text in pkl file
:param delete_scratch: delete all scratch files when calculation is finished
:return: output [, electronic_structure]
"""
from pyqchem.parsers.parser_fchk import parser_fchk
cache = CacheSystem()
# back-compatibility layer
if read_fchk:
warnings.warn("'read_fchk' will be deprecated, use return_electronic_structure instead",
DeprecationWarning, stacklevel=2)
return_electronic_structure = read_fchk
if fchk_only:
warnings.warn("'fchk_only' option will be deprecated",
DeprecationWarning, stacklevel=2)
# Always generate fchk
if input_qchem.gui is None or input_qchem.gui < 1:
input_qchem.gui = 2
if scratch is None:
scratch = os.getenv('QCSCRATCH')
if scratch is None:
warnings.warn('QCSCRATCH environment variable not defined, using workdir')
scratch = '.'
work_dir = '{}/pyqchem_{}/'.format(scratch, os.getpid())
try:
os.mkdir(work_dir)
except OSError:
pass
# check if parameters is None
if parser_parameters is None:
parser_parameters = {}
# check if full output is stored
output = cache.retrieve_calculation_data(input_qchem, 'fullout')
#output, err = cache.calculation_data[(hash(input_qchem), 'fullout')] if (hash(input_qchem), 'fullout') in cache.calculation_data else [None, None]
elect_struct_data = cache.retrieve_calculation_data(input_qchem, 'fchk')
# check if repeated calculation
if not force_recalculation and not store_full_output: # store_full_output always force re-parsing
if parser is not None:
parsed_data = cache.retrieve_calculation_data(hash(input_qchem), parser.__name__)
if parsed_data is not None:
if return_electronic_structure:
return parsed_data, elect_struct_data
else:
return parsed_data
else:
if fchk_only and elect_struct_data is not None:
return output, elect_struct_data
# temp filenames generated in temp directory
fchk_filename = 'qchem_temp_{}.fchk'.format(os.getpid())
temp_filename = 'qchem_temp_{}.inp'.format(os.getpid())
# generate the input in TXT form
input_txt = input_qchem.get_txt()
qchem_input_file = open(os.path.join(work_dir, temp_filename), mode='w')
qchem_input_file.write(input_txt)
qchem_input_file.close()
# generate extra files in calculation directory
generate_additional_files(input_qchem, work_dir)
# Q-Chem calculation
if output is None or force_recalculation is True:
if remote is None:
output, err = local_run(temp_filename, work_dir, fchk_filename, use_mpi=use_mpi, processors=processors)
else:
output, err = remote_run(temp_filename, work_dir, fchk_filename, remote, use_mpi=use_mpi, processors=processors)
if not finish_ok(output):
raise OutputError(output, err)
# parse fchk file & and additional scratch dir files
if not os.path.isfile(os.path.join(work_dir, fchk_filename)):
warnings.warn('fchk not found! something may be wrong in calculation')
else:
with open(os.path.join(work_dir, fchk_filename)) as f:
fchk_txt = f.read()
elect_struct_data = parser_fchk(fchk_txt)
elect_struct_data.update(retrieve_additional_files(input_qchem,
elect_struct_data,
work_dir,
scratch_read_level))
cache.store_calculation_data(input_qchem, 'fchk', elect_struct_data)
if store_full_output:
cache.store_calculation_data(input_qchem, 'fullout', output)
if parser is not None:
# Check parser compatibility
version = get_version_output(output)
compatibility_list = get_compatibility_list_from_parser(parser)
if compatibility_list is not None:
if version not in compatibility_list:
warnings.warn('Parser "{}" may not be compatible with Q-Chem {}'.format(parser.__name__, version))
# minimum functionality for parser error capture
try:
output = parser(output, **parser_parameters)
except:
raise ParserError(parser.__name__, 'Undefined error', output)
cache.store_calculation_data(input_qchem, parser.__name__, output)
if delete_scratch:
shutil.rmtree(work_dir)
if return_electronic_structure:
return output, elect_struct_data
else:
return output
def get_input_hash(data):
return hashlib.md5(data.encode()).hexdigest()