// // File: hdbBinRepos.cpp // // Legal Notice: This file is part of the HadronZoo C++ Class Library. Copyright 2025 HadronZoo Project (http://www.hadronzoo.com) // // The HadronZoo C++ Class Library is free software: You can redistribute it, and/or modify it under the terms of the GNU Lesser General Public License, as published by the Free // Software Foundation, either version 3 of the License, or any later version. // // The HadronZoo C++ Class Library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR // A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License along with the HadronZoo C++ Class Library. If not, see http://www.gnu.org/licenses. //
#include <iostream> #include <fstream> #include <cstdio>
using namespace std ;
#include <unistd.h> #include <stdlib.h> #include <fcntl.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h>
//#include "hzChain.h" #include "hzDirectory.h" #include "hzDatabase.h"
/* ** Variables */
/* ** SECTION 1: hdbBinRepos Functions */
hdbBinRepos::hdbBinRepos (hdbADP& adp) { m_pADP = &adp ; m_nSeqId = 0 ; m_nPopulation = 0 ; m_nInitState = 0 ;
_hzGlobal_Memstats.m_numBincron++ ; }
hdbBinRepos::~hdbBinRepos (void) { if (m_nInitState == 2) Close() ;
_hzGlobal_Memstats.m_numBincron-- ; }
/* ** hdbBinRepos Init and Halt Functions */
hzEcode hdbBinRepos::Init (const hzString& name, const hzString& opdir) { // Initialize the hdbBinRepos instance. This ensures the working directory exists and both the index and data file exist. // // Note that this function does not leave index or data files open. This is left to hdbBinRepos::Open() as a separate step. // // Arguments: 1) name Name of object set to be stored in the hdbBinRepos (will be base of the two filenames) // 2) opdir Full pathname of operational directory // // Returns: E_OPENFAIL The files could not be opened or created (no space or wrong permissions) // E_OK Operation was successful. // // Errors: Any false return is due to an irrecoverable error. The calling function // must check the return value.
_hzfunc("hdbBinRepos::Init") ;
FSTAT fs ; // File status ifstream is ; // Input stream for reading index file hzEcode rc ; // Return code
if (m_nInitState) return hzerr(E_INITDUP, "Resource already initialized") ;
// Check we have a name and working directory if (!name) return hzerr(E_ARGUMENT, "No name") ;
if (!opdir) return hzerr(E_ARGUMENT, "Repository %s: No working directory", *name) ;
// Check we do not already have a datacron of the name if (m_pADP->GetBinRepos(name)) return hzerr(E_DUPLICATE, "Binary datum cron %s already exists", *name) ;
// Assign internal structure, name and working directory m_Name = name ; m_Workdir = opdir ; threadLog("called with cron name %s and workdir %s\n", *m_Name, *m_Workdir) ;
// Assert working directory if (lstat(*m_Workdir, &fs) == -1) { rc = AssertDir(*m_Workdir, 0777) ; if (rc != E_OK) return hzerr(rc, "Could not assert working dir of %s", *m_Workdir) ; }
// Set the pathnames for the index and data file m_FileIndx = m_Workdir + "/" + m_Name + ".idx" ; m_FileData = m_Workdir + "/" + m_Name + ".dat" ;
if (lstat(*m_FileIndx, &fs) == -1) m_nSeqId = m_nPopulation = 0 ; else m_nSeqId = m_nPopulation = fs.st_size / sizeof(_datum_hd) ;
if (lstat(*m_FileData, &fs) == -1) m_nSize = 0 ; else m_nSize = fs.st_size ;
// Set init state etc threadLog("A Initialized %s (%p) count %u (size %u) with err=%s\n", *m_Name, this, Count(), m_nSize, Err2Txt(rc)) ; rc = m_pADP->RegisterBinRepos(this) ; threadLog("B Initialized %s (%p) count %d with files %s and %s. err=%s\n", *m_Name, this, m_pADP->CountBinRepos(), *m_FileIndx, *m_FileData, Err2Txt(rc)) ;
m_nInitState = 1 ; return E_OK ; }
hzEcode hdbBinRepos::Open (void) { // Open the hdbBinRepos instance. // // This is a matter of opening an input and an output stream for both index the data files. Note the output streams are opened with ios::app as they will only ever append. // // Arguments: None // // Returns: E_NOINIT If the repository has not been initialized // E_SEQUENCE If the repository is already open // E_OPENFAIL If either the index or data file cannot be opened for either reading or writing // E_OK If the operation was successful
_hzfunc("hdbBinRepos::Open") ;
if (m_nInitState == 0) hzexit(E_NOINIT, "Cannot open an uninitialized datacron") ; if (m_nInitState == 2) hzexit(E_SEQUENCE, "Datacron %s is already open", *m_Name) ;
m_WrI.open(*m_FileIndx, std::ios::app) ; if (m_WrI.fail()) return hzerr(E_OPENFAIL, "Cannot open index file for writing: Repos %s", *m_FileIndx) ; threadLog("Opened index file for writing: Repos %s\n", *m_FileIndx) ;
m_WrD.open(*m_FileData, std::ios::app) ; if (m_WrD.fail()) return hzerr(E_OPENFAIL, "Could not open file (%s) for writing", *m_FileData) ; threadLog("Opened data file for writing: Repos %s\n", *m_FileData) ;
m_RdI.open(*m_FileIndx) ; if (m_RdI.fail()) return hzerr(E_OPENFAIL, "Could not open index file (%s) for reading", *m_FileIndx) ; threadLog("Opened index file for reading: Repos %s\n", *m_FileIndx) ;
m_RdD.open(*m_FileData) ; if (m_RdD.fail()) return hzerr(E_OPENFAIL, "Could not open data file (%s) for reading", *m_FileData) ; threadLog("Opened index file for reading: Repos %s\n", *m_FileData) ;
threadLog("INITIALIZED %s\n", *m_Name) ; m_nInitState = 2 ;
return E_OK ; }
hzEcode hdbBinRepos::Close (void) { // Close the Binary Repository. // // Arguments: None // // Returns: E_NOINIT The datacron is not initialized // E_SEQUENCE The datacron is not open // E_OK The operation was successful
_hzfunc("hdbBinRepos::Close") ;
if (m_nInitState < 1) return E_NOINIT ; if (m_nInitState < 2) return E_NOTOPEN ;
if (m_RdI.is_open()) m_RdI.close() ; if (m_RdD.is_open()) m_RdD.close() ; if (m_WrI.is_open()) m_WrI.close() ; if (m_WrD.is_open()) m_WrD.close() ;
threadLog("CLOSED %s\n", *m_Name) ; m_nInitState = 1 ; return E_OK ; }
hzEcode hdbBinRepos::Insert (uint32_t& datumId, const hzChain& datum) { // Insert the supplied datum (hzChain content) into the repository and set the supplied datum id in respect of the datum. // // This is a matter of appending the data file of the current partition with the datum, and of appending the index file of the current partition with the datum metadata. // // Purpose: Insert data contained in the supplied row source. This could be any of the row source // classes. This function is for mass import // // Arguments: 1) datumId The datum id - the address the will occupy (if specified) // 2) datum The datum. // // Returns: E_NOINIT If this hdbBinRepos instance has not been initialized // E_NODATA If the supplied datum is of zero size // E_WRITEFAIL If the data could not be written to disk. // E_OK If operation successful
_hzfunc("hdbBinRepos::Insert") ;
_datum_hd hdr ; // Datum header hzEcode rc = E_OK ; // Return code
if (m_nInitState < 1) return E_NOINIT ; if (m_nInitState < 2) return E_NOTOPEN ;
datumId = 0 ; if (!datum.Size()) return E_NODATA ;
hdr.m_DTStamp.SysDateTime() ; hdr.m_Size = datum.Size() ; hdr.m_Prev = 0 ;
// Write to the index m_LockIwr.Lock() ;
hdr.m_Addr = m_nSize ; datumId = ++m_nSeqId ; m_nPopulation++ ; m_nSize += datum.Size() ;
m_WrI.write((char*) &hdr, sizeof(_datum_hd)) ;
if (m_WrI.fail()) { m_Error.Printf("Could not update index delta for datum %d\n", datumId) ; rc = E_WRITEFAIL ; }
m_LockIwr.Unlock() ; m_LockDwr.Lock() ;
if (rc == E_OK) { // Write to the data file m_WrD << datum ;
if (m_WrD.fail()) { m_Error.Printf("Could not update data file for datum %d\n", datumId) ; rc = E_WRITEFAIL ; } }
m_LockDwr.Unlock() ;
if (rc == E_OK) { m_WrI.flush() ; m_WrD.flush() ; }
return rc ; }
hzEcode hdbBinRepos::Update (uint32_t& datumId, const hzChain& datum) { // Replace the datum identified by the supplied id (arg1), with the contents of the supplied hzChain (arg2). In all cases this action is a DEPRECATE of the existing datum, and // an INSERT of the new. // // Arguments: 1) datumId Datum ID - Set in the call to the original datum, set by this function to the new datum id // 2) datum The datum // // Returns: E_NOINIT The datacron is not initialized // E_NOTOPEN Attempt to modify object zero // E_NOTFOUND Attempt to modify non existant object // E_WRITEFAIL The datacron is not open for writing // E_RANGE The requested item does not exist // E_OK The operation was successful
_hzfunc("hdbBinRepos::Update") ;
_datum_hd hdr ; // Datum header hzEcode rc = E_OK ; // Return code
// Repos not initialized? if (m_nInitState < 1) return E_NOINIT ;
// Repos not open? if (m_nInitState < 2) return E_NOTOPEN ;
// No datum id supplied? if (datumId == 0) return E_NOTOPEN ;
// Datum id out of range if (datumId > m_nSeqId) return E_NOTFOUND ;
// Empty datum? if (!datum.Size()) return E_NODATA ;
hdr.m_DTStamp.SysDateTime() ; hdr.m_Size = datum.Size() ; hdr.m_Prev = datumId ;
// rem = ((16 - datum.Size()) % 16) % 16 ; // tot = datum.Size() + rem ;
// Write to the index m_LockIwr.Lock() ;
hdr.m_Addr = m_nSize ; datumId = ++m_nSeqId ; m_nSize += datum.Size() ;
m_WrI.write((char*) &hdr, sizeof(_datum_hd)) ;
// Internal write error? if (m_WrI.fail()) { m_Error.Printf("Could not update index delta for datum %d\n", datumId) ; rc = E_WRITEFAIL ; }
m_LockIwr.Unlock() ; m_LockDwr.Lock() ;
// No errors? if (rc == E_OK) { // Write to the data file m_WrD << datum ;
// Internal write error? if (m_WrD.fail()) { m_Error.Printf("Could not update data file for datum %d\n", datumId) ; rc = E_WRITEFAIL ; } }
m_LockDwr.Unlock() ;
// No errors? if (rc == E_OK) { // Flush index and data files m_WrI.flush() ; m_WrD.flush() ; }
return rc ; }
hzEcode hdbBinRepos::Fetch (hzChain& datum, uint32_t datumId) const { // Purpose: Fetches a single row of data into the supplied object. // // Arguments: 1) obj The object to be populated // 2) datumId The object id // // Returns: E_NOTOPEN If the reader stream is not open for reading // E_READFAIL The reader stream is already in a fialed state // E_CORRUPT Row metadata not as expected. Possible overwrites // E_FORMAT Data format error // E_OK The operation was successful
_hzfunc("hdbBinRepos::Fetch") ;
_datum_hd hdr ; // Datum header char* workBuf ; // Working buffer uint64_t nAddr ; // Addr of object in file uint64_t nPosn ; // Posn of m_Reader uint32_t toGet ; // Bytes to get in the next read operation uint32_t soFar ; // Bytes read so far hzEcode rc = E_OK ; // Return code
//if (datum.Copies() > 1) // threadLog("Copies of chain %u\n", datum.Copies()) ; datum.Clear() ;
// Repos not initialized? if (m_nInitState < 1) return E_NOINIT ;
// Repos not open? if (m_nInitState < 2) return E_NOTOPEN ;
// Zero datum id? if (!datumId) return hzerr(E_RANGE, "NULL datum id") ;
// Datum id out of range if (datumId > m_nSeqId) { threadLog("Out of range\n") ; rc = E_RANGE ; }
m_Error.Clear() ;
// Read in requested object m_LockIrd.Lock() ;
// Get datum header nAddr = (datumId-1) * sizeof(_datum_hd) ;
m_RdI.seekg((streamoff) nAddr) ; nPosn = m_RdI.tellg() ; if (nPosn != nAddr) { rc = E_READFAIL ; threadLog("Could not seek header address %u for datum %u\n", nAddr, datumId) ; } else { threadLog("Found header address %u for datum %u\n", nAddr, datumId) ; m_RdI.read((char*) &hdr, sizeof(_datum_hd)) ; if (m_RdI.fail()) { rc = E_READFAIL ; threadLog("Could not read header for datum %u", datumId) ; m_RdI.clear() ; } }
m_LockIrd.Unlock() ;
if (rc != E_OK) return rc ;
workBuf = new char[HZ_BLOCKSIZE] ; m_LockDrd.Lock() ;
// Get datum threadLog("Found datum for datum %u addr %u, size %u\n", datumId, hdr.m_Addr, hdr.m_Size) ; m_RdD.seekg((streamoff) hdr.m_Addr) ; nPosn = m_RdD.tellg() ; if (nPosn != hdr.m_Addr) { rc = E_READFAIL ; threadLog("Could not seek data address %ul for datum %d\n", hdr.m_Addr, datumId) ; } else { //threadLog("case A hdr size %u\n", hdr.m_Size) ; toGet = HZ_BLOCKSIZE - (hdr.m_Addr % HZ_BLOCKSIZE) ; if (toGet > hdr.m_Size) toGet = hdr.m_Size ; soFar = 0 ;
//threadLog("case B toget %u\n", toGet) ; m_RdD.read(workBuf, toGet) ; if (m_RdD.gcount() != toGet) threadLog("Warning: toget %u, got %u\n", toGet, m_RdD.gcount()) ; datum.Append(workBuf, toGet) ; soFar += toGet ;
for (; soFar < hdr.m_Size ;) { toGet = hdr.m_Size - soFar ; if (toGet > HZ_BLOCKSIZE) toGet = HZ_BLOCKSIZE ;
//threadLog("case C sofar %u toget %u\n", soFar, toGet) ; m_RdD.read(workBuf, toGet) ; if (m_RdD.gcount() != toGet) threadLog("Warning: toget %u, got %u\n", toGet, m_RdD.gcount()) ; datum.Append(workBuf, toGet) ; soFar += toGet ; }
//threadLog("case 5\n") ; if (m_RdD.fail()) { rc = E_READFAIL ; threadLog("m_RdD failed\n") ; m_RdD.clear() ; }
threadLog("Now have %u bytes in Datum\n", datum.Size()) ; }
m_LockDrd.Unlock() ; delete [] workBuf ; return rc ; }
hzEcode hdbBinRepos::Integ (hzLogger& log) { // Performs an integrity check on the binary datum repository. The test expects to find in the index file, the time/date stamp and addresses of datum to be in ascending order. // It further expects to find the sizes total exactly matches the data file size. // // Argument: log The log-channel for the integrity report // // Returns: E_NOTOPEN If the reader stream is not open for reading // E_READFAIL The reader stream is already in a fialed state // E_CORRUPT Row metadata not as expected. Possible overwrites // E_FORMAT Data format error // E_OK The operation was successful
_hzfunc("hdbBinRepos::Integ") ;
ifstream is ; // Input stream for index file (separate from the in-built) _datum_hd* pHd ; // Datum header pointer _datum_hd arrHd[128] ; // Datum header array (128 headers) uint32_t nBytes ; // Bytes read in from index file uint32_t nItems ; // Number of headers read in from index file uint32_t nC ; // Header iterator uint32_t totalItems = 0 ; // Bytes to get in the next read operation uint32_t totalSize ; // Bytes to get in the next read operation hzEcode rc = E_OK ; // Return code
// Repos not initialized? if (m_nInitState < 1) return E_NOINIT ;
// Open index file is.open(*m_FileIndx) ;
// Index file open failure? if (is.fail()) return hzerr(E_OPENFAIL, "Could not open index file %s\n", *m_FileIndx) ;
// Process index blocks for (;;) { is.read((char*) arrHd, 4096) ; nBytes = is.gcount() ;
if (nBytes % sizeof(_datum_hd)) { log.Log("Unexpected read size %d\n", nBytes) ; break ; } nItems = nBytes/sizeof(_datum_hd) ;
for (pHd = arrHd, nC = 0 ; nC < nItems ; nC++, pHd++) { totalItems++ ; //log.Out("Datum addr %u id %u time/date %s size %u\n", pHd->m_Addr, totalItems, *pHd->m_DTStamp.Str(), pHd->m_Size) ;
totalSize += pHd->m_Size ; }
if (nItems < 128) break ; }
is.close() ;
log.Log("Total items %u\n", totalItems) ; log.Log("Total size %u\n", totalSize) ; return rc ; }
hzEcode hdbBinRepos::Delete (uint32_t datumId) { // Delete an object from the datacron. // // This function appears only for consistency with the base class and does nothing. // // Arguments: 1) datumId The object id to be deleted // // Returns: E_NOINIT The datacron is not initialized // E_SEQUENCE The datacron is not open for writing // E_RANGE The requested item does not exist // E_OK Operation successful
_hzfunc("hdbBinRepos::Delete") ;
// Repos not initialized? if (m_nInitState < 1) return E_NOINIT ;
// Repos not open? if (m_nInitState < 2) return E_NOTOPEN ;
// Datum id out of range if (datumId < 1 || datumId >= m_nSeqId) return E_RANGE ;
return E_OK ; }