24 #include "kmessageio.h"
33 : TQObject (parent, name), m_id (0)
55 mSocket =
new TQSocket ();
56 mSocket->connectToHost (host, port);
61 *parent,
const char *name)
64 mSocket =
new TQSocket ();
65 mSocket->connectToHost (host.toString(), port);
81 mSocket =
new TQSocket ();
82 mSocket->setSocket (socketFD);
93 return mSocket->state() == TQSocket::Connection;
98 TQDataStream str (mSocket);
99 str << TQ_UINT8 (
'M');
100 str.writeBytes (msg.data(), msg.size());
103 void KMessageSocket::processNewData ()
109 TQDataStream str (mSocket);
110 while (mSocket->bytesAvailable() > 0)
115 if (mSocket->bytesAvailable() < 5)
128 kdWarning(11001) << k_funcinfo <<
": Received unexpected data, magic number wrong!" << endl;
132 str >> mNextBlockLength;
133 mAwaitingHeader =
false;
138 if (mSocket->bytesAvailable() < (TQ_ULONG) mNextBlockLength)
144 TQByteArray msg (mNextBlockLength);
145 str.readRawBytes (msg.data(), mNextBlockLength);
151 mAwaitingHeader =
true;
158 void KMessageSocket::initSocket ()
160 connect (mSocket, TQT_SIGNAL (error(
int)), TQT_SIGNAL (
connectionBroken()));
161 connect (mSocket, TQT_SIGNAL (connectionClosed()), TQT_SIGNAL (
connectionBroken()));
162 connect (mSocket, TQT_SIGNAL (readyRead()), TQT_SLOT (processNewData()));
163 mAwaitingHeader =
true;
164 mNextBlockLength = 0;
170 return mSocket->peerPort();
175 return mSocket->peerName();
189 if (partner && partner->mPartner)
191 kdWarning(11001) << k_funcinfo <<
": Object is already connected!" << endl;
199 partner->mPartner =
this;
206 mPartner->mPartner = 0;
213 return mPartner != 0;
221 kdError(11001) << k_funcinfo <<
": Not yet connected!" << endl;
227 KMessageProcess::~KMessageProcess()
229 kdDebug(11001) <<
"@@@KMessageProcess::Delete process" << endl;
236 mQueue.setAutoDelete(
true);
241 KMessageProcess::KMessageProcess(TQObject *parent, TQString file) :
KMessageIO(parent,0)
244 kdDebug(11001) <<
"@@@KMessageProcess::Start process" << endl;
246 mProcess=
new TDEProcess;
248 *mProcess << mProcessName << TQString(
"%1").arg(
id);
249 kdDebug(11001) <<
"@@@KMessageProcess::Init:Id= " <<
id << endl;
250 kdDebug(11001) <<
"@@@KMessgeProcess::Init:Processname: " << mProcessName << endl;
251 connect(mProcess, TQT_SIGNAL(receivedStdout(TDEProcess *,
char *,
int )),
252 this, TQT_SLOT(slotReceivedStdout(TDEProcess *,
char * ,
int )));
253 connect(mProcess, TQT_SIGNAL(receivedStderr(TDEProcess *,
char *,
int )),
254 this, TQT_SLOT(slotReceivedStderr(TDEProcess *,
char * ,
int )));
255 connect(mProcess, TQT_SIGNAL(processExited(TDEProcess *)),
256 this, TQT_SLOT(slotProcessExited(TDEProcess *)));
257 connect(mProcess, TQT_SIGNAL(wroteStdin(TDEProcess *)),
258 this, TQT_SLOT(slotWroteStdin(TDEProcess *)));
259 mProcess->start(TDEProcess::NotifyOnExit,TDEProcess::All);
262 mReceiveBuffer.resize(1024);
264 bool KMessageProcess::isConnected()
const
266 kdDebug(11001) <<
"@@@KMessageProcess::Is conencted" << endl;
267 if (!mProcess)
return false;
268 return mProcess->isRunning();
270 void KMessageProcess::send(
const TQByteArray &msg)
272 kdDebug(11001) <<
"@@@KMessageProcess:: SEND("<<msg.size()<<
") to process" << endl;
273 unsigned int size=msg.size()+2*
sizeof(long);
275 char *tmpbuffer=
new char[size];
276 long *p1=(
long *)tmpbuffer;
278 kdDebug(11001) <<
"p1="<<p1 <<
"p2="<< p2 << endl;
279 memcpy(tmpbuffer+2*
sizeof(
long),msg.data(),msg.size());
283 TQByteArray *buffer=
new TQByteArray();
284 buffer->assign(tmpbuffer,size);
286 mQueue.enqueue(buffer);
289 void KMessageProcess::writeToProcess()
292 if (mSendBuffer || mQueue.isEmpty())
return ;
293 mSendBuffer=mQueue.dequeue();
294 if (!mSendBuffer) return ;
300 mProcess->writeStdin(mSendBuffer->data(),mSendBuffer->size());
303 void KMessageProcess::slotWroteStdin(TDEProcess * )
305 kdDebug(11001) << k_funcinfo << endl;
314 void KMessageProcess::slotReceivedStderr(TDEProcess * proc,
char *buffer,
int buflen)
322 if (!buffer || buflen==0) return ;
323 if (proc) pid=proc->pid();
329 p=(
char *)memchr(pos,
'\n',buflen);
334 a.setRawData(pos,len);
336 kdDebug(11001) <<
"PID" <<pid<<
":" << s << endl;
337 a.resetRawData(pos,len);
344 void KMessageProcess::slotReceivedStdout(TDEProcess * ,
char *buffer,
int buflen)
346 kdDebug(11001) <<
"$$$$$$ " << k_funcinfo <<
": Received " << buflen <<
" bytes over inter process communication" << endl;
349 while (mReceiveCount+buflen>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
350 memcpy(mReceiveBuffer.data()+mReceiveCount,buffer,buflen);
351 mReceiveCount+=buflen;
354 while (mReceiveCount>2*
sizeof(
long))
356 long *p1=(
long *)mReceiveBuffer.data();
361 kdDebug(11001) << k_funcinfo <<
": Cookie error...transmission failure...serious problem..." << endl;
365 if (len<2*
sizeof(
long))
367 kdDebug(11001) << k_funcinfo <<
": Message size error" << endl;
370 if (len<=mReceiveCount)
372 kdDebug(11001) << k_funcinfo <<
": Got message with len " << len << endl;
376 msg.duplicate(mReceiveBuffer.data()+2*
sizeof(long),len-2*
sizeof(
long));
380 if (len<mReceiveCount)
382 memmove(mReceiveBuffer.data(),mReceiveBuffer.data()+len,mReceiveCount-len);
390 void KMessageProcess::slotProcessExited(TDEProcess * )
392 kdDebug(11001) <<
"Process exited (slot)" << endl;
393 emit connectionBroken();
400 KMessageFilePipe::KMessageFilePipe(TQObject *parent,TQFile *readfile,TQFile *writefile) :
KMessageIO(parent,0)
403 mWriteFile=writefile;
405 mReceiveBuffer.resize(1024);
408 KMessageFilePipe::~KMessageFilePipe()
412 bool KMessageFilePipe::isConnected ()
const
414 return (mReadFile!=0)&&(mWriteFile!=0);
417 void KMessageFilePipe::send(
const TQByteArray &msg)
419 unsigned int size=msg.size()+2*
sizeof(long);
421 char *tmpbuffer=
new char[size];
422 long *p1=(
long *)tmpbuffer;
424 memcpy(tmpbuffer+2*
sizeof(
long),msg.data(),msg.size());
429 buffer.assign(tmpbuffer,size);
430 mWriteFile->writeBlock(buffer);
439 void KMessageFilePipe::exec()
445 int ch=mReadFile->getch();
447 while (mReceiveCount>=mReceiveBuffer.size()) mReceiveBuffer.resize(mReceiveBuffer.size()+1024);
448 mReceiveBuffer[mReceiveCount]=(char)ch;
452 if (mReceiveCount>=2*
sizeof(
long))
454 long *p1=(
long *)mReceiveBuffer.data();
459 fprintf(stderr,
"KMessageFilePipe::exec:: Cookie error...transmission failure...serious problem...\n");
463 if (len==mReceiveCount)
469 msg.duplicate(mReceiveBuffer.data()+2*
sizeof(long),len-2*
sizeof(
long));
482 #include "kmessageio.moc"
~KMessageIO()
The usual destructor, does nothing special.
KMessageSocket(TQString host, TQ_UINT16 port, TQObject *parent=0, const char *name=0)
Connects to a server socket on /e host with /e port.
void received(const TQByteArray &msg)
This signal is emitted when /e send() on the connected KMessageIO object is called.
void send(const TQByteArray &msg)
Overwritten slot method from KMessageIO.
This class implements the message communication using function calls directly.
virtual TQString peerName() const
bool isConnected() const
Returns true, if the object is connected to another instance.
This abstract base class represents one end of a message connections between two clients.
bool isConnected() const
Returns true if the socket is in state /e connected.
void setId(TQ_UINT32 id)
Sets the ID number of this object.
virtual TQ_UINT16 peerPort() const
KMessageIO(TQObject *parent=0, const char *name=0)
The usual TQObject constructor, does nothing else.
~KMessageSocket()
Destructor, closes the socket.
void connectionBroken()
This signal is emitted when the connection is closed.
void send(const TQByteArray &msg)
Overwritten slot method from KMessageIO.
KMessageDirect(KMessageDirect *partner=0, TQObject *parent=0, const char *name=0)
Creates an object and connects it to the object given in the first parameter.
~KMessageDirect()
Destructor, closes the connection.
TQ_UINT32 id()
Queries the ID of this object.