//
// File: hzDelta.cpp
//
// Legal Notice: This file is part of the HadronZoo C++ Class Library. Copyright 2025 HadronZoo Project (http://www.hadronzoo.com)
//
// The HadronZoo C++ Class Library is free software: You can redistribute it, and/or modify it under the terms of the GNU Lesser General Public License, as published by the Free
// Software Foundation, either version 3 of the License, or any later version.
//
// The HadronZoo C++ Class Library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
// A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License along with the HadronZoo C++ Class Library. If not, see http://www.gnu.org/licenses.
//
//
// Interface to Delta Server
//
#include <iostream>
#include <fstream>
using namespace std ;
#include <stdarg.h>
#include <unistd.h>
#include <netdb.h>
#include <dirent.h>
#include <sys/stat.h>
#include <sys/un.h>
#include "hzDocument.h"
#include "hzDatabase.h"
#include "hzDirectory.h"
#include "hzProcess.h"
#include "hzIpServer.h"
using namespace std ;
/*
** Variables
*/
static hzMapS <hzString,uint16_t> s_mapDS_AppsByName ; // 1:1 map of app names to app ids
static hzMapS <uint16_t,hzString> s_mapDS_AppsByID ; // 1:1 map of app ids to app names
static hzString s_DS_Hosts[4] ; // DS Hosts (Cluster machines)
static hzIpaddr s_DS_Addrs[4] ; // DS machine IP addresses
static uint16_t s_DS_Ports[4] ; // DS ports
static uint32_t s_deltaPort ; // Delta server port on local machine
const hzString s_deltaConf = "/home/deltasvr/cluster.xml" ; // Standard pathname of Delta config file
/*
** Delta Functions
*/
#if 0
hzEcode ReadConfigDS (void)
{
// This reads the DS configs from the standard location /home/deltasvr/cluster.xml, which if it exists, is presumed to be owned by root.
_hzfunc(__func__) ;
FSTAT fs ; // File status
hzDocXml X ; // XML document
hzChain err ; // Error report
hzAttrset ai ; // Attribute iterator
hzXmlNode* pRoot ; // Root XML tag
hzXmlNode* pN ; // XML node/tag
hzXmlNode* pNC ; // XML node for <deltaCluster> tag
hzXmlNode* pNS ; // XML node for <point[A-D]> tags
hzString appName ; // Application name
hzIpaddr ipa ; // IP address
uint32_t nAppId ; // App id
uint32_t nPort ; // Port number
uint32_t nIndex ; // Server iterator
hzEcode rc ; // Return code
/*
** Read the DS config to obtain delta cluster info and to find the appID as used by the DS
*/
if (lstat(s_deltaConf, &fs) < 0)
return hzerr(E_NOTFOUND, "Delta config file %s not found\n", *s_deltaConf) ;
rc = X.Load(s_deltaConf) ;
if (rc != E_OK)
return hzerr(rc, "Config (%s): Could not load\n", *s_deltaConf) ;
pRoot = X.GetRoot() ;
if (!pRoot->NameEQ("ConfigDS"))
return hzerr(rc, "Config (%s): Expected <ConfigDS> root tag. Got <%s>\n", *s_deltaConf, pRoot->txtName()) ;
// Process <deltaCluster> tag
pNC = pRoot->GetFirstChild() ;
if (!pNC || !pNC->NameEQ("deltaCluster"))
return hzerr(rc, "Config (%s): Expected <deltaCluster> tag as 1st child of <ConfigDS>\n", *s_deltaConf) ;
// Expect <deltaCluster> subtags to describe servers
for (pNS = pNC->GetFirstChild() ; pNS ; pNS = pNS->Sibling())
{
if (pNS->NameEQ("pointA")) nIndex = 0 ;
else if (pNS->NameEQ("pointB")) nIndex = 1 ;
else if (pNS->NameEQ("pointC")) nIndex = 2 ;
else if (pNS->NameEQ("pointD")) nIndex = 3 ;
else
return hzerr(E_CONFIG, "Illegal tag <%s> in <deltaCluster>. Only <point[A-D]> allowed\n", pNS->txtName()) ;
if (s_DS_Hosts[nIndex])
return hzerr(E_DUPLICATE, "Duplicate tag <%s> in <deltaCluster>\n", pNS->txtName()) ;
for (ai = pNS ; ai.Valid() ; ai.Advance())
{
if (ai.NameEQ("host"))
s_DS_Hosts[nIndex] = ai.Value() ;
else if (ai.NameEQ("addr"))
{
if (ai.Value())
{
ipa = ai.Value() ;
if (ipa == _hzGlobal_nullIP)
return hzerr(E_CONFIG, "Line %d: Invalid IP address (%s)\n", pNS->Line(), ai.Value()) ;
s_DS_Addrs[nIndex] = ipa ;
}
}
else if (ai.NameEQ("port"))
{
if (ai.Value())
{
nPort = atoi(ai.Value()) ;
if (!nPort)
return hzerr(E_CONFIG, "Line %d: Invalid port (%s)\n", pNS->Line(), ai.Value()) ;
s_DS_Ports[nIndex] = nPort ;
}
}
else
return hzerr(E_CONFIG, "Line %d: Illegal attribute %s. Only host|addr|port allowed", pNS->Line(), ai.Name()) ;
}
// Check we have everything
if (!s_DS_Hosts[nIndex])
return hzerr(E_CONFIG, "No hostname specified for server %s", pNS->txtName()) ;
if (s_DS_Addrs[nIndex] || s_DS_Ports[nIndex])
{
// Both must be either NULL or set
if (s_DS_Addrs[nIndex] == _hzGlobal_nullIP)
return hzerr(E_CONFIG, "Line %d: Invalid IP address (port is %u)\n", pNS->Line(), s_DS_Ports[nIndex]) ;
if (!s_DS_Ports[nIndex])
return hzerr(E_CONFIG, "Line %d: IP address supplied but no port\n", pNS->Line()) ;
// Get port for notifications
if (ipa == _hzGlobal_livehost)
s_deltaPort = nPort ;
}
}
// Process <deltaCluster> siblings. Expect <dsApp> and <dsRealm> tags. Only process <dsApp>
for (pN = pNC->Sibling() ; rc == E_OK && pN ; pN = pN->Sibling())
{
// Only interested in <deltaCluster> here
if (!pN->NameEQ("dsRealm"))
continue ;
if (!pN->NameEQ("dsApp"))
{ rc = E_CONFIG ; err.Printf("Illegal tag <%s> in <ConfigDS>. Only <dsRealm> and <dsApp> allowed\n", pN->txtName()) ; }
// Process DS attributes
for (ai = pN ; ai.Valid() ; ai.Advance())
{
if (ai.NameEQ("appname"))
appName = ai.Value() ;
else if (ai.NameEQ("appid"))
{
if (ai.Value())
{
nAppId = atoi(ai.Value()) ;
if (!nAppId)
{ rc = E_CONFIG ; err.Printf("Line %d: Invalid app id (%s)\n", pNS->Line(), ai.Value()) ; }
}
}
else
{ rc = E_CONFIG ; err.Printf("Line %d: Illegal <dsApp> attribute %s. Only host|addr|port allowed", pN->Line(), ai.Name()) ; }
}
if (!appName) { rc = E_CONFIG ; err.Printf("Line %d: <dsApp> tag does not specify an appName\n", pN->Line()) ; }
if (!nAppId) { rc = E_CONFIG ; err.Printf("Line %d: <dsApp> tag does not specify an app ID\n", pN->Line()) ; }
if (rc == E_OK)
{
if (s_mapDS_AppsByName.Exists(appName)) { rc = E_DUPLICATE ; err.Printf("Line %d: App (%s) already exists\n", pN->Line(), *appName) ; }
if (s_mapDS_AppsByID.Exists(nAppId)) { rc = E_DUPLICATE ; err.Printf("Line %d: App ID (%u) already exists\n", pN->Line(), nAppId) ; }
}
}
if (err.Size())
threadLog(err) ;
return rc ;
}
hzEcode hdbADP::DeltaInit (void)
{
// Category: System
//
// This description assumes knowledge of the HadronZoo Database (HDB), the Delta Server (DS), and the ADP (Application Delta Profile). Please see the HadronZoo library manual,
// section 5.
//
// Programs that use the DS must call DeltaInit() as part of the initialization sequence. This call must take place AFTER data model construction is complete (all repositories
// declared and initialized), and BEFORE data transactions commence.
//
// This function calls ReadConfigDS() in order to establish the following:-
//
//
// The primary objective is to verify the ADP. If the application is new (of a hitherto unknown appname), the ADP is simply accepted. This is safe if the config read succeeded
// as this proves the data model (from which the ADP is compiled), is viable. On ADP acceptance, the ADP description is written to a standard location, usually /etc/hzDelta/.
//
// On subsequent runs the ADP description is compared to that stored in /etc/hzDelta/. The two ADP descriptions will be identical if there has been no change to the data model
// since the last run. If the data model has changed the ADP descriptions will not match. The new ADP can contain entities that do not exist in the old, and can omit entities
// that do exists in the old. However where the entity exists in both, the two ADPs must exactly agree on every detail of the entity. The program terminates if this comparison
// fails.
//
// Note that ADP comparison does not involve the DS. Once the comparison succeeeds, if the application is delta enabled, the next step is to seek a connection to the DS. While
// the DS is intended to be omnipresent, if it isn't up this function will return E_OK, allowing the application to go into service without the DS.
//
// Applications will not necessarily be delta enabled. It is standard practice for them not to be during development. Enabling a program for DS operation is a matter of adding
// a note to this effect in the configs. Not all applications will expect to receive deltas from sister applications on other machines. Those that do must add a DS handler and
// port, to the hzIpServer singleton.
//
// Note that the DS reads ADP descriptions from the standard location. The ADP description used to be passed to the DS by the application. This is no longer the practice.
//
// Arguments: None
//
// Returns: E_ARGUMENT If one or more arguments is null or invalid
// E_INITFAIL If there is currently no hostname and none could be established
// E_FORMAT If there is the application profile held in /ect/hzDelta.d/ is not of the expected form
// E_OPENFAIL If this function cannot write out the application's version of the profile
// E_OK If a connection to a delta server was established or if bMustHave is false
_hzfunc("hdbADP::DeltaInit") ;
static bool bBeenHere = false ;
uint32_t nIndex ; // Server iterator
hzEcode rc = E_OK ; // Return code
if (!m_appName)
return hzerr(E_SEQUENCE, "No application name") ;
if (bBeenHere)
return hzerr(E_SEQUENCE, "Repeat call") ;
// Establish hostname of this machine
if (!_hzGlobal_Hostname)
{
if (SetupHost() != E_OK)
hzexit(E_INITFAIL, "Could not setup hostname") ;
if (!_hzGlobal_Hostname)
hzexit(E_INITFAIL, "No hostname established") ;
}
// Check that application configs have defined at least one data class and repository
if (!CountDataClass())
return hzerr(E_NODATA, "No Data Clases defined") ;
if (!CountObjRepos())
return hzerr(E_NODATA, "No Data Repositories defined") ;
// Read the DS config to obtain delta cluster info and to find the appID as used by the DS
rc = ReadConfigDS() ;
if (rc != E_OK)
return rc ;
/*
** DS servers and apps now listsed. Can set m_nDeltaID and if applicable, init connection to DS
*/
for (nIndex = 0 ; nIndex < 4 ; nIndex++)
{
if (s_DS_Hosts[nIndex] == _hzGlobal_Hostname)
{
threadLog("Local Machine is member of the delta cluster\n") ;
break ;
}
}
if (s_DS_Addrs[nIndex] != _hzGlobal_nullIP && s_DS_Ports[nIndex])
{
threadLog("Local Machine is delta active\n") ;
m_nDeltaID = s_mapDS_AppsByName[m_appName] ;
}
if (!m_nDeltaID)
{
threadLog("This app (%s) is NOT delta active\n", *m_appName) ;
return E_OK ;
}
m_eDeltaMode = DELTA_MODE_AUTH ;
// Connect to DS as client
SOCKADDRUN svrAddr ; // Address of Local DS
// Create the socket
memset(&svrAddr, 0, sizeof(svrAddr)) ;
svrAddr.sun_family = AF_UNIX ;
strcpy(svrAddr.sun_path, "/home/deltasvr/DS.unix.socket") ;
if ((m_unixSock = socket(AF_UNIX, SOCK_STREAM, 0)) <= 0)
return hzerr(E_NOSOCKET, "Could not create DS socket (returns %d, errno %d)") ;
threadLog("DS: Using socket %d\n", m_unixSock) ;
// Connect stage
if (connect(m_unixSock, (struct sockaddr *) &svrAddr, sizeof(svrAddr)) < 0)
return hzerr(E_HOSTFAIL, "Could not connect to DS (errno=%d)", errno) ;
return E_OK ;
}
#endif
/*
** Delta Origination
*/
#if 0
hzEcode hdbADP::DeltaOriginateFile (const char* filepath, hzDeltaReq eOp) const
{
// Call this to sync application file change. All this does is send the pathname of the file or dir, and the operator.
_hzfunc(__func__) ;
hzEcode rc = E_OK ; // Return code
if (m_eDeltaMode == DELTA_MODE_NULL)
return E_OK ;
return rc ;
}
hzEcode hdbADP::DeltaOriginateObj (const hdbObjRepos* pObjRepos, hzChain& Zd) const
{
_hzfunc("hdbADP::DeltaOriginateObj") ;
hzChain::BlkIter bi ; // Chain block iterator
hzChain X ; // Chain to send to DS
int32_t nSend ; // Bytes to send
int32_t nSent ; // Bytes sent
uint32_t nSIL ; // Serial integer len
hzEcode rc = E_OK ; // Return code
char outBuf[64] ; // Used to state repos id etc
if (m_eDeltaMode == DELTA_MODE_NULL)
return E_OK ;
// Compile delta header
nSend = sprintf(outBuf, "a=%s;r=%s;", *m_appName, pObjRepos->txtName()) ;
nSend += 2 ;
nSend += Zd.Size() ;
// Write command
X.AddByte(DELTA_ORIG_OBJECT) ;
// Write nLen
WriteSerialUINT32(X, nSIL, nSend) ;
// Write delta header
X << outBuf ;
// Write delta
X << Zd ;
threadLog(X) ;
// Write to socket
for (bi = X ; rc == E_OK && bi.Data() ; bi.Advance())
{
nSent = write(m_unixSock, bi.Data(), bi.Size()) ;
if (nSent < 0)
rc = hzerr(E_SENDFAIL, "Send error (errno=%d)\n", errno) ;
}
return E_OK ;
}
hzEcode hdbADP::DeltaOriginateBin (const hdbObjRepos* pObjRepos, const hdbMember* pMbr, hzChain& Zd, hzDeltaReq eOp) const
{
_hzfunc("hdbADP::DeltaOriginateBin") ;
hzChain::BlkIter bi ; // Chain block iterator
hzChain X ; // Chain to send to DS
int32_t nSend ; // Bytes to send
int32_t nSent ; // Bytes sent
uint32_t nSIL ; // Serial integer len
hzEcode rc = E_OK ; // Return code
char outBuf[64] ; // Used to state repos id etc
if (m_eDeltaMode == DELTA_MODE_NULL)
return E_OK ;
// Compile delta header
nSend = sprintf(outBuf, "a=%s;r%u.m%ui@bin", *m_appName, pObjRepos->DeltaId(), pMbr->Posn()) ;
nSend += 2 ;
nSend += Zd.Size() ;
// Write command
X.AddByte(DELTA_ORIG_OBJECT) ;
// Write nLen
WriteSerialUINT32(X, nSIL, nSend) ;
// Write delta header
X << outBuf ;
// Write delta
X << Zd ;
threadLog(X) ;
// Write to socket
for (bi = X ; rc == E_OK && bi.Data() ; bi.Advance())
{
nSent = write(m_unixSock, bi.Data(), bi.Size()) ;
if (nSent < 0)
rc = hzerr(E_SENDFAIL, "Send error (errno=%d)\n", errno) ;
}
return E_OK ;
}
hzEcode hdbADP::DeltaOriginateMbr (const hdbObjRepos* pObjRepos, const hdbMember* pMbr, hzAtom value, hzDeltaReq eOp) const
{
if (m_eDeltaMode == DELTA_MODE_NULL)
return E_OK ;
return E_OK ;
}
#endif
/*
** hzDeltaClient Functions
*/
#if 0
hzEcode hzDeltaClient::SendFile (hzString& filepath)
hzEcode hzDeltaClient::DelFile (hzString& filepath)
hzEcode hzDeltaClient::DeltaWrite (hzChain& Zd)
#endif
/*
** Handle notifications from server
*/
hzTcpCode ProcDelta (hzChain& ZI, hzIpConnex* pCx)
{
// Category: System
//
// Accepts and processes notification deltas from the Delta Server.
//
// These will be of repository deltas and application files on sister applications running on other machines in the cluster. In the case of repository updates, the task is to
// locate the repository and apply the update. In the event of an uploaded file, the DS will apply the change.
//
// This function must be supplied to the hzIpServer singleton, by an AddTCPPort() call.
_hzfunc(__func__) ;
chIter zi ; // Input message iterator
//uint32_t sessId ;
char res [8] ;
/*
if (!_hzGlobal_DeltaClient)
{
res[0] = res[1] = res[2] = res[3] = 0 ;
res[4] = DELTA_SVR_NACK ;
res[5] = res[6] = res[7] = 0 ;
}
else
{
// Formulate command
sessId = _hzGlobal_DeltaClient->SessID() ;
res[0] = (sessId & 0xff000000) >> 24 ;
res[1] = (sessId & 0xff0000) >> 16 ;
res[2] = (sessId & 0xff00) >> 8 ;
res[3] = sessId & 0xff ;
res[4] = DELTA_SVR_ACK ;
res[5] = res[6] = res[7] = 0 ;
}
*/
if (write(pCx->CliSocket(), res, 8) < 0)
{
hzerr(E_SENDFAIL, "Send failed, killing connection\n") ;
return TCP_TERMINATE ;
}
return TCP_KEEPALIVE ;
}