//
// File: hdbBinRepos.cpp
//
// Legal Notice: This file is part of the HadronZoo C++ Class Library. Copyright 2025 HadronZoo Project (http://www.hadronzoo.com)
//
// The HadronZoo C++ Class Library is free software: You can redistribute it, and/or modify it under the terms of the GNU Lesser General Public License, as published by the Free
// Software Foundation, either version 3 of the License, or any later version.
//
// The HadronZoo C++ Class Library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
// A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License along with the HadronZoo C++ Class Library. If not, see http://www.gnu.org/licenses.
//
#include <iostream>
#include <fstream>
#include <cstdio>
using namespace std ;
#include <unistd.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
//#include "hzChain.h"
#include "hzDirectory.h"
#include "hzDatabase.h"
/*
** Variables
*/
/*
** SECTION 1: hdbBinRepos Functions
*/
hdbBinRepos::hdbBinRepos (hdbADP& adp)
{
m_pADP = &adp ;
m_nSeqId = 0 ;
m_nPopulation = 0 ;
m_nInitState = 0 ;
_hzGlobal_Memstats.m_numBincron++ ;
}
hdbBinRepos::~hdbBinRepos (void)
{
if (m_nInitState == 2)
Close() ;
_hzGlobal_Memstats.m_numBincron-- ;
}
/*
** hdbBinRepos Init and Halt Functions
*/
hzEcode hdbBinRepos::Init (const hzString& name, const hzString& opdir)
{
// Initialize the hdbBinRepos instance. This ensures the working directory exists and both the index and data file exist.
//
// Note that this function does not leave index or data files open. This is left to hdbBinRepos::Open() as a separate step.
//
// Arguments: 1) name Name of object set to be stored in the hdbBinRepos (will be base of the two filenames)
// 2) opdir Full pathname of operational directory
//
// Returns: E_OPENFAIL The files could not be opened or created (no space or wrong permissions)
// E_OK Operation was successful.
//
// Errors: Any false return is due to an irrecoverable error. The calling function
// must check the return value.
_hzfunc("hdbBinRepos::Init") ;
FSTAT fs ; // File status
ifstream is ; // Input stream for reading index file
hzEcode rc ; // Return code
if (m_nInitState)
return hzerr(E_INITDUP, "Resource already initialized") ;
// Check we have a name and working directory
if (!name)
return hzerr(E_ARGUMENT, "No name") ;
if (!opdir)
return hzerr(E_ARGUMENT, "Repository %s: No working directory", *name) ;
// Check we do not already have a datacron of the name
if (m_pADP->GetBinRepos(name))
return hzerr(E_DUPLICATE, "Binary datum cron %s already exists", *name) ;
// Assign internal structure, name and working directory
m_Name = name ;
m_Workdir = opdir ;
threadLog("called with cron name %s and workdir %s\n", *m_Name, *m_Workdir) ;
// Assert working directory
if (lstat(*m_Workdir, &fs) == -1)
{
rc = AssertDir(*m_Workdir, 0777) ;
if (rc != E_OK)
return hzerr(rc, "Could not assert working dir of %s", *m_Workdir) ;
}
// Set the pathnames for the index and data file
m_FileIndx = m_Workdir + "/" + m_Name + ".idx" ;
m_FileData = m_Workdir + "/" + m_Name + ".dat" ;
if (lstat(*m_FileIndx, &fs) == -1)
m_nSeqId = m_nPopulation = 0 ;
else
m_nSeqId = m_nPopulation = fs.st_size / sizeof(_datum_hd) ;
if (lstat(*m_FileData, &fs) == -1)
m_nSize = 0 ;
else
m_nSize = fs.st_size ;
// Set init state etc
threadLog("A Initialized %s (%p) count %u (size %u) with err=%s\n", *m_Name, this, Count(), m_nSize, Err2Txt(rc)) ;
rc = m_pADP->RegisterBinRepos(this) ;
threadLog("B Initialized %s (%p) count %d with files %s and %s. err=%s\n",
*m_Name, this, m_pADP->CountBinRepos(), *m_FileIndx, *m_FileData, Err2Txt(rc)) ;
m_nInitState = 1 ;
return E_OK ;
}
hzEcode hdbBinRepos::Open (void)
{
// Open the hdbBinRepos instance.
//
// This is a matter of opening an input and an output stream for both index the data files. Note the output streams are opened with ios::app as they will only ever append.
//
// Arguments: None
//
// Returns: E_NOINIT If the repository has not been initialized
// E_SEQUENCE If the repository is already open
// E_OPENFAIL If either the index or data file cannot be opened for either reading or writing
// E_OK If the operation was successful
_hzfunc("hdbBinRepos::Open") ;
if (m_nInitState == 0) hzexit(E_NOINIT, "Cannot open an uninitialized datacron") ;
if (m_nInitState == 2) hzexit(E_SEQUENCE, "Datacron %s is already open", *m_Name) ;
m_WrI.open(*m_FileIndx, std::ios::app) ;
if (m_WrI.fail())
return hzerr(E_OPENFAIL, "Cannot open index file for writing: Repos %s", *m_FileIndx) ;
threadLog("Opened index file for writing: Repos %s\n", *m_FileIndx) ;
m_WrD.open(*m_FileData, std::ios::app) ;
if (m_WrD.fail())
return hzerr(E_OPENFAIL, "Could not open file (%s) for writing", *m_FileData) ;
threadLog("Opened data file for writing: Repos %s\n", *m_FileData) ;
m_RdI.open(*m_FileIndx) ;
if (m_RdI.fail())
return hzerr(E_OPENFAIL, "Could not open index file (%s) for reading", *m_FileIndx) ;
threadLog("Opened index file for reading: Repos %s\n", *m_FileIndx) ;
m_RdD.open(*m_FileData) ;
if (m_RdD.fail())
return hzerr(E_OPENFAIL, "Could not open data file (%s) for reading", *m_FileData) ;
threadLog("Opened index file for reading: Repos %s\n", *m_FileData) ;
threadLog("INITIALIZED %s\n", *m_Name) ;
m_nInitState = 2 ;
return E_OK ;
}
hzEcode hdbBinRepos::Close (void)
{
// Close the Binary Repository.
//
// Arguments: None
//
// Returns: E_NOINIT The datacron is not initialized
// E_SEQUENCE The datacron is not open
// E_OK The operation was successful
_hzfunc("hdbBinRepos::Close") ;
if (m_nInitState < 1) return E_NOINIT ;
if (m_nInitState < 2) return E_NOTOPEN ;
if (m_RdI.is_open()) m_RdI.close() ;
if (m_RdD.is_open()) m_RdD.close() ;
if (m_WrI.is_open()) m_WrI.close() ;
if (m_WrD.is_open()) m_WrD.close() ;
threadLog("CLOSED %s\n", *m_Name) ;
m_nInitState = 1 ;
return E_OK ;
}
hzEcode hdbBinRepos::Insert (uint32_t& datumId, const hzChain& datum)
{
// Insert the supplied datum (hzChain content) into the repository and set the supplied datum id in respect of the datum.
//
// This is a matter of appending the data file of the current partition with the datum, and of appending the index file of the current partition with the datum metadata.
//
// Purpose: Insert data contained in the supplied row source. This could be any of the row source
// classes. This function is for mass import
//
// Arguments: 1) datumId The datum id - the address the will occupy (if specified)
// 2) datum The datum.
//
// Returns: E_NOINIT If this hdbBinRepos instance has not been initialized
// E_NODATA If the supplied datum is of zero size
// E_WRITEFAIL If the data could not be written to disk.
// E_OK If operation successful
_hzfunc("hdbBinRepos::Insert") ;
_datum_hd hdr ; // Datum header
hzEcode rc = E_OK ; // Return code
if (m_nInitState < 1) return E_NOINIT ;
if (m_nInitState < 2) return E_NOTOPEN ;
datumId = 0 ;
if (!datum.Size())
return E_NODATA ;
hdr.m_DTStamp.SysDateTime() ;
hdr.m_Size = datum.Size() ;
hdr.m_Prev = 0 ;
// Write to the index
m_LockIwr.Lock() ;
hdr.m_Addr = m_nSize ;
datumId = ++m_nSeqId ;
m_nPopulation++ ;
m_nSize += datum.Size() ;
m_WrI.write((char*) &hdr, sizeof(_datum_hd)) ;
if (m_WrI.fail())
{ m_Error.Printf("Could not update index delta for datum %d\n", datumId) ; rc = E_WRITEFAIL ; }
m_LockIwr.Unlock() ;
m_LockDwr.Lock() ;
if (rc == E_OK)
{
// Write to the data file
m_WrD << datum ;
if (m_WrD.fail())
{ m_Error.Printf("Could not update data file for datum %d\n", datumId) ; rc = E_WRITEFAIL ; }
}
m_LockDwr.Unlock() ;
if (rc == E_OK)
{
m_WrI.flush() ;
m_WrD.flush() ;
}
return rc ;
}
hzEcode hdbBinRepos::Update (uint32_t& datumId, const hzChain& datum)
{
// Replace the datum identified by the supplied id (arg1), with the contents of the supplied hzChain (arg2). In all cases this action is a DEPRECATE of the existing datum, and
// an INSERT of the new.
//
// Arguments: 1) datumId Datum ID - Set in the call to the original datum, set by this function to the new datum id
// 2) datum The datum
//
// Returns: E_NOINIT The datacron is not initialized
// E_NOTOPEN Attempt to modify object zero
// E_NOTFOUND Attempt to modify non existant object
// E_WRITEFAIL The datacron is not open for writing
// E_RANGE The requested item does not exist
// E_OK The operation was successful
_hzfunc("hdbBinRepos::Update") ;
_datum_hd hdr ; // Datum header
hzEcode rc = E_OK ; // Return code
// Repos not initialized?
if (m_nInitState < 1)
return E_NOINIT ;
// Repos not open?
if (m_nInitState < 2)
return E_NOTOPEN ;
// No datum id supplied?
if (datumId == 0)
return E_NOTOPEN ;
// Datum id out of range
if (datumId > m_nSeqId)
return E_NOTFOUND ;
// Empty datum?
if (!datum.Size())
return E_NODATA ;
hdr.m_DTStamp.SysDateTime() ;
hdr.m_Size = datum.Size() ;
hdr.m_Prev = datumId ;
// rem = ((16 - datum.Size()) % 16) % 16 ;
// tot = datum.Size() + rem ;
// Write to the index
m_LockIwr.Lock() ;
hdr.m_Addr = m_nSize ;
datumId = ++m_nSeqId ;
m_nSize += datum.Size() ;
m_WrI.write((char*) &hdr, sizeof(_datum_hd)) ;
// Internal write error?
if (m_WrI.fail())
{ m_Error.Printf("Could not update index delta for datum %d\n", datumId) ; rc = E_WRITEFAIL ; }
m_LockIwr.Unlock() ;
m_LockDwr.Lock() ;
// No errors?
if (rc == E_OK)
{
// Write to the data file
m_WrD << datum ;
// Internal write error?
if (m_WrD.fail())
{ m_Error.Printf("Could not update data file for datum %d\n", datumId) ; rc = E_WRITEFAIL ; }
}
m_LockDwr.Unlock() ;
// No errors?
if (rc == E_OK)
{
// Flush index and data files
m_WrI.flush() ;
m_WrD.flush() ;
}
return rc ;
}
hzEcode hdbBinRepos::Fetch (hzChain& datum, uint32_t datumId) const
{
// Purpose: Fetches a single row of data into the supplied object.
//
// Arguments: 1) obj The object to be populated
// 2) datumId The object id
//
// Returns: E_NOTOPEN If the reader stream is not open for reading
// E_READFAIL The reader stream is already in a fialed state
// E_CORRUPT Row metadata not as expected. Possible overwrites
// E_FORMAT Data format error
// E_OK The operation was successful
_hzfunc("hdbBinRepos::Fetch") ;
_datum_hd hdr ; // Datum header
char* workBuf ; // Working buffer
uint64_t nAddr ; // Addr of object in file
uint64_t nPosn ; // Posn of m_Reader
uint32_t toGet ; // Bytes to get in the next read operation
uint32_t soFar ; // Bytes read so far
hzEcode rc = E_OK ; // Return code
//if (datum.Copies() > 1)
// threadLog("Copies of chain %u\n", datum.Copies()) ;
datum.Clear() ;
// Repos not initialized?
if (m_nInitState < 1)
return E_NOINIT ;
// Repos not open?
if (m_nInitState < 2)
return E_NOTOPEN ;
// Zero datum id?
if (!datumId)
return hzerr(E_RANGE, "NULL datum id") ;
// Datum id out of range
if (datumId > m_nSeqId)
{
threadLog("Out of range\n") ;
rc = E_RANGE ;
}
m_Error.Clear() ;
// Read in requested object
m_LockIrd.Lock() ;
// Get datum header
nAddr = (datumId-1) * sizeof(_datum_hd) ;
m_RdI.seekg((streamoff) nAddr) ;
nPosn = m_RdI.tellg() ;
if (nPosn != nAddr)
{
rc = E_READFAIL ;
threadLog("Could not seek header address %u for datum %u\n", nAddr, datumId) ;
}
else
{
threadLog("Found header address %u for datum %u\n", nAddr, datumId) ;
m_RdI.read((char*) &hdr, sizeof(_datum_hd)) ;
if (m_RdI.fail())
{
rc = E_READFAIL ;
threadLog("Could not read header for datum %u", datumId) ;
m_RdI.clear() ;
}
}
m_LockIrd.Unlock() ;
if (rc != E_OK)
return rc ;
workBuf = new char[HZ_BLOCKSIZE] ;
m_LockDrd.Lock() ;
// Get datum
threadLog("Found datum for datum %u addr %u, size %u\n", datumId, hdr.m_Addr, hdr.m_Size) ;
m_RdD.seekg((streamoff) hdr.m_Addr) ;
nPosn = m_RdD.tellg() ;
if (nPosn != hdr.m_Addr)
{
rc = E_READFAIL ;
threadLog("Could not seek data address %ul for datum %d\n", hdr.m_Addr, datumId) ;
}
else
{
//threadLog("case A hdr size %u\n", hdr.m_Size) ;
toGet = HZ_BLOCKSIZE - (hdr.m_Addr % HZ_BLOCKSIZE) ;
if (toGet > hdr.m_Size)
toGet = hdr.m_Size ;
soFar = 0 ;
//threadLog("case B toget %u\n", toGet) ;
m_RdD.read(workBuf, toGet) ;
if (m_RdD.gcount() != toGet)
threadLog("Warning: toget %u, got %u\n", toGet, m_RdD.gcount()) ;
datum.Append(workBuf, toGet) ;
soFar += toGet ;
for (; soFar < hdr.m_Size ;)
{
toGet = hdr.m_Size - soFar ;
if (toGet > HZ_BLOCKSIZE)
toGet = HZ_BLOCKSIZE ;
//threadLog("case C sofar %u toget %u\n", soFar, toGet) ;
m_RdD.read(workBuf, toGet) ;
if (m_RdD.gcount() != toGet)
threadLog("Warning: toget %u, got %u\n", toGet, m_RdD.gcount()) ;
datum.Append(workBuf, toGet) ;
soFar += toGet ;
}
//threadLog("case 5\n") ;
if (m_RdD.fail())
{
rc = E_READFAIL ;
threadLog("m_RdD failed\n") ;
m_RdD.clear() ;
}
threadLog("Now have %u bytes in Datum\n", datum.Size()) ;
}
m_LockDrd.Unlock() ;
delete [] workBuf ;
return rc ;
}
hzEcode hdbBinRepos::Integ (hzLogger& log)
{
// Performs an integrity check on the binary datum repository. The test expects to find in the index file, the time/date stamp and addresses of datum to be in ascending order.
// It further expects to find the sizes total exactly matches the data file size.
//
// Argument: log The log-channel for the integrity report
//
// Returns: E_NOTOPEN If the reader stream is not open for reading
// E_READFAIL The reader stream is already in a fialed state
// E_CORRUPT Row metadata not as expected. Possible overwrites
// E_FORMAT Data format error
// E_OK The operation was successful
_hzfunc("hdbBinRepos::Integ") ;
ifstream is ; // Input stream for index file (separate from the in-built)
_datum_hd* pHd ; // Datum header pointer
_datum_hd arrHd[128] ; // Datum header array (128 headers)
uint32_t nBytes ; // Bytes read in from index file
uint32_t nItems ; // Number of headers read in from index file
uint32_t nC ; // Header iterator
uint32_t totalItems = 0 ; // Bytes to get in the next read operation
uint32_t totalSize ; // Bytes to get in the next read operation
hzEcode rc = E_OK ; // Return code
// Repos not initialized?
if (m_nInitState < 1)
return E_NOINIT ;
// Open index file
is.open(*m_FileIndx) ;
// Index file open failure?
if (is.fail())
return hzerr(E_OPENFAIL, "Could not open index file %s\n", *m_FileIndx) ;
// Process index blocks
for (;;)
{
is.read((char*) arrHd, 4096) ;
nBytes = is.gcount() ;
if (nBytes % sizeof(_datum_hd))
{
log.Log("Unexpected read size %d\n", nBytes) ;
break ;
}
nItems = nBytes/sizeof(_datum_hd) ;
for (pHd = arrHd, nC = 0 ; nC < nItems ; nC++, pHd++)
{
totalItems++ ;
//log.Out("Datum addr %u id %u time/date %s size %u\n", pHd->m_Addr, totalItems, *pHd->m_DTStamp.Str(), pHd->m_Size) ;
totalSize += pHd->m_Size ;
}
if (nItems < 128)
break ;
}
is.close() ;
log.Log("Total items %u\n", totalItems) ;
log.Log("Total size %u\n", totalSize) ;
return rc ;
}
hzEcode hdbBinRepos::Delete (uint32_t datumId)
{
// Delete an object from the datacron.
//
// This function appears only for consistency with the base class and does nothing.
//
// Arguments: 1) datumId The object id to be deleted
//
// Returns: E_NOINIT The datacron is not initialized
// E_SEQUENCE The datacron is not open for writing
// E_RANGE The requested item does not exist
// E_OK Operation successful
_hzfunc("hdbBinRepos::Delete") ;
// Repos not initialized?
if (m_nInitState < 1)
return E_NOINIT ;
// Repos not open?
if (m_nInitState < 2)
return E_NOTOPEN ;
// Datum id out of range
if (datumId < 1 || datumId >= m_nSeqId)
return E_RANGE ;
return E_OK ;
}