00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035 #include "ocilib_internal.h"
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045 OCI_Dequeue * OCI_API OCI_DequeueCreate
00046 (
00047 OCI_TypeInfo *typinf,
00048 const mtext *name
00049 )
00050 {
00051 OCI_Dequeue *dequeue = NULL;
00052 boolean res = TRUE;
00053
00054 OCI_CHECK_INITIALIZED(NULL);
00055
00056 OCI_CHECK_PTR(OCI_IPC_TYPE_INFO, typinf, NULL);
00057 OCI_CHECK_PTR(OCI_IPC_STRING, name, NULL);
00058
00059
00060
00061 dequeue = (OCI_Dequeue *) OCI_MemAlloc(OCI_IPC_DEQUEUE, sizeof(*dequeue), (size_t) 1, TRUE);
00062
00063 if (dequeue != NULL)
00064 {
00065 dequeue->typinf = typinf;
00066 dequeue->name = mtsdup(name);
00067
00068
00069
00070 res = (OCI_SUCCESS == OCI_DescriptorAlloc((dvoid * ) OCILib.env,
00071 (dvoid **) &dequeue->opth,
00072 OCI_DTYPE_AQDEQ_OPTIONS,
00073 (size_t) 0, (dvoid **) NULL));
00074
00075
00076
00077 if (res == TRUE)
00078 {
00079 dequeue->msg = OCI_MsgCreate(dequeue->typinf);
00080 }
00081
00082 res = (dequeue->msg != NULL);
00083 }
00084 else
00085 res = FALSE;
00086
00087
00088
00089 if (res == FALSE)
00090 {
00091 OCI_DequeueFree(dequeue);
00092 dequeue = NULL;
00093 }
00094
00095 return dequeue;
00096 }
00097
00098
00099
00100
00101
00102 boolean OCI_API OCI_DequeueFree
00103 (
00104 OCI_Dequeue *dequeue
00105 )
00106 {
00107 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, FALSE);
00108
00109
00110
00111 if (dequeue->msg != NULL)
00112 {
00113 OCI_MsgFree(dequeue->msg);
00114 }
00115
00116
00117
00118 if (dequeue->agent != NULL)
00119 {
00120 OCI_AgentFree(dequeue->agent);
00121 }
00122
00123
00124
00125 OCI_DescriptorFree((dvoid *) dequeue->opth, OCI_DTYPE_AQDEQ_OPTIONS);
00126
00127
00128
00129 OCI_FREE(dequeue->name);
00130 OCI_FREE(dequeue->pattern);
00131 OCI_FREE(dequeue->consumer);
00132
00133
00134
00135 OCI_FREE(dequeue->agent_list);
00136
00137 OCI_FREE(dequeue);
00138
00139 return TRUE;
00140 }
00141
00142
00143
00144
00145
00146 OCI_Agent * OCI_API OCI_DequeueListen
00147 (
00148 OCI_Dequeue *dequeue,
00149 int timeout
00150 )
00151 {
00152 boolean res = TRUE;
00153 OCI_Agent *agent = NULL;
00154 OCIAQAgent *handle = NULL;
00155
00156 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, NULL);
00157
00158
00159
00160 if (dequeue->agent_list != NULL)
00161 {
00162 sword ret;
00163 sb4 code;
00164
00165 ret = OCIAQListen(dequeue->typinf->con->cxt, dequeue->typinf->con->err,
00166 dequeue->agent_list, (ub4) dequeue->agent_count,
00167 (sb4) timeout, &handle, OCI_DEFAULT);
00168
00169
00170
00171 if (ret == OCI_ERROR)
00172 {
00173 OCIErrorGet((dvoid *) dequeue->typinf->con->err, (ub4) 1,
00174 (OraText *) NULL, &code, (OraText *) NULL, (ub4) 0,
00175 (ub4) OCI_HTYPE_ERROR);
00176
00177
00178
00179 if (code != OCI_ERR_AQ_LISTEN_TIMEOUT)
00180 {
00181 OCI_ExceptionOCI(dequeue->typinf->con->err, dequeue->typinf->con, NULL, FALSE);
00182
00183 res = FALSE;
00184 }
00185 }
00186
00187
00188
00189 if ((res == TRUE) && (ret == OCI_SUCCESS) && (handle != NULL))
00190 {
00191 agent = OCI_AgentInit(dequeue->typinf->con, &dequeue->agent, handle, NULL, NULL);
00192 }
00193 }
00194
00195 OCI_RESULT(res);
00196
00197 return agent;
00198 }
00199
00200
00201
00202
00203
00204 OCI_Msg * OCI_API OCI_DequeueGet
00205 (
00206 OCI_Dequeue *dequeue
00207 )
00208 {
00209 boolean res = TRUE;
00210 void *ostr = NULL;
00211 int osize = -1;
00212 OCI_Msg *msg = NULL;
00213 sword ret = OCI_SUCCESS;
00214 void *p_ind = NULL;
00215
00216 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, NULL);
00217
00218
00219
00220 res = OCI_MsgReset(dequeue->msg);
00221
00222 if (res == TRUE)
00223 {
00224 ostr = OCI_GetInputMetaString(dequeue->name, &osize);
00225
00226 if (dequeue->typinf->tcode == OCI_UNKNOWN)
00227 {
00228 p_ind = &dequeue->msg->ind;
00229 }
00230
00231
00232
00233 ret = OCIAQDeq(dequeue->typinf->con->cxt, dequeue->typinf->con->err,
00234 ostr, dequeue->opth, dequeue->msg->proph, dequeue->typinf->tdo,
00235 &dequeue->msg->payload, (void **) &p_ind, &dequeue->msg->id, OCI_DEFAULT);
00236
00237 OCI_ReleaseMetaString(ostr);
00238
00239
00240
00241 if (ret == OCI_ERROR)
00242 {
00243 sb4 code = 0;
00244
00245 OCIErrorGet((dvoid *) dequeue->typinf->con->err, (ub4) 1,
00246 (OraText *) NULL, &code, (OraText *) NULL, (ub4) 0,
00247 (ub4) OCI_HTYPE_ERROR);
00248
00249
00250
00251 if (code != OCI_ERR_AQ_DEQUEUE_TIMEOUT)
00252 {
00253 OCI_ExceptionOCI(dequeue->typinf->con->err, dequeue->typinf->con, NULL, FALSE);
00254
00255 res = FALSE;
00256 }
00257 }
00258 }
00259
00260
00261
00262 if ((res == TRUE) && (ret == OCI_SUCCESS))
00263 {
00264
00265
00266 if (dequeue->typinf->tcode != OCI_UNKNOWN)
00267 {
00268 if ((p_ind != NULL) && ((*(OCIInd *) p_ind) != OCI_IND_NULL))
00269 {
00270 dequeue->msg->ind = *(OCIInd *) p_ind;
00271
00272 dequeue->msg->obj = OCI_ObjectInit(dequeue->typinf->con,
00273 (OCI_Object **) &dequeue->msg->obj,
00274 dequeue->msg->payload, dequeue->typinf,
00275 NULL, -1, TRUE);
00276
00277 res = dequeue->msg->obj != NULL;
00278 }
00279 }
00280 }
00281
00282
00283
00284 if ((res == TRUE) && (ret == OCI_SUCCESS))
00285 {
00286 msg = dequeue->msg;
00287 }
00288
00289 OCI_RESULT(res);
00290
00291 return msg;
00292 }
00293
00294
00295
00296
00297
00298 const mtext * OCI_API OCI_DequeueGetConsumer
00299 (
00300 OCI_Dequeue *dequeue
00301 )
00302 {
00303 boolean res = TRUE;
00304
00305 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, NULL);
00306
00307 if (dequeue->consumer == NULL)
00308 {
00309 res = OCI_StringGetFromAttrHandle(dequeue->typinf->con,
00310 dequeue->opth,
00311 OCI_DTYPE_AQDEQ_OPTIONS,
00312 OCI_ATTR_CONSUMER_NAME,
00313 &dequeue->consumer);
00314 }
00315
00316 OCI_RESULT(res);
00317
00318 return dequeue->consumer;
00319 }
00320
00321
00322
00323
00324
00325 boolean OCI_API OCI_DequeueSetConsumer
00326 (
00327 OCI_Dequeue *dequeue,
00328 const mtext *consumer
00329 )
00330 {
00331 boolean res = TRUE;
00332
00333 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, FALSE);
00334
00335 res = OCI_StringSetToAttrHandle(dequeue->typinf->con,
00336 dequeue->opth,
00337 OCI_DTYPE_AQDEQ_OPTIONS,
00338 OCI_ATTR_CONSUMER_NAME,
00339 &dequeue->consumer,
00340 consumer);
00341
00342 OCI_RESULT(res);
00343
00344 return res;
00345 }
00346
00347
00348
00349
00350
00351 const mtext * OCI_API OCI_DequeueGetCorrelation
00352 (
00353 OCI_Dequeue *dequeue
00354 )
00355 {
00356 boolean res = TRUE;
00357
00358 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, NULL);
00359
00360 if (dequeue->pattern == NULL)
00361 {
00362 res = OCI_StringGetFromAttrHandle(dequeue->typinf->con,
00363 dequeue->opth,
00364 OCI_DTYPE_AQDEQ_OPTIONS,
00365 OCI_ATTR_CORRELATION,
00366 &dequeue->pattern);
00367 }
00368
00369 OCI_RESULT(res);
00370
00371 return dequeue->pattern;
00372 }
00373
00374
00375
00376
00377
00378 boolean OCI_API OCI_DequeueSetCorrelation
00379 (
00380 OCI_Dequeue *dequeue,
00381 const mtext *pattern
00382 )
00383 {
00384 boolean res = TRUE;
00385
00386 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, FALSE);
00387
00388 res = OCI_StringSetToAttrHandle(dequeue->typinf->con,
00389 dequeue->opth,
00390 OCI_DTYPE_AQDEQ_OPTIONS,
00391 OCI_ATTR_CORRELATION,
00392 &dequeue->pattern,
00393 pattern);
00394
00395 OCI_RESULT(res);
00396
00397 return res;
00398 }
00399
00400
00401
00402
00403
00404 boolean OCI_API OCI_DequeueGetRelativeMsgID
00405 (
00406 OCI_Dequeue *dequeue,
00407 void *id,
00408 unsigned int *len
00409 )
00410 {
00411 boolean res = TRUE;
00412 OCIRaw *value = NULL;
00413
00414 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, FALSE);
00415 OCI_CHECK_PTR(OCI_IPC_VOID, id, FALSE);
00416 OCI_CHECK_PTR(OCI_IPC_VOID, len, FALSE);
00417
00418 OCI_CALL2
00419 (
00420 res, dequeue->typinf->con,
00421
00422 OCIAttrGet((dvoid *) dequeue->opth,
00423 (ub4 ) OCI_DTYPE_AQDEQ_OPTIONS,
00424 (dvoid *) &value,
00425 (ub4 *) NULL,
00426 (ub4 ) OCI_ATTR_DEQ_MSGID,
00427 dequeue->typinf->con->err)
00428 )
00429
00430 if (value != NULL)
00431 {
00432 ub4 raw_len = 0;
00433
00434 raw_len = OCIRawSize(OCILib.env, value);
00435
00436 if (*len > raw_len)
00437 *len = raw_len;
00438
00439 memcpy(id, OCIRawPtr(OCILib.env, value), (size_t) (*len));
00440 }
00441 else
00442 {
00443 *len = 0;
00444 }
00445
00446 OCI_RESULT(res);
00447
00448 return res;
00449 }
00450
00451
00452
00453
00454
00455 boolean OCI_API OCI_DequeueSetRelativeMsgID
00456 (
00457 OCI_Dequeue *dequeue,
00458 const void *id,
00459 unsigned int len
00460 )
00461 {
00462 boolean res = TRUE;
00463 OCIRaw *value = NULL;
00464
00465 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, FALSE);
00466
00467 OCI_CALL2
00468 (
00469 res, dequeue->typinf->con,
00470
00471 OCIRawAssignBytes(OCILib.env, dequeue->typinf->con->err,
00472 (ub1*) id, (ub4) len, (OCIRaw **) &value)
00473 )
00474
00475 OCI_CALL2
00476 (
00477 res, dequeue->typinf->con,
00478
00479 OCIAttrSet((dvoid *) dequeue->opth,
00480 (ub4 ) OCI_DTYPE_AQDEQ_OPTIONS,
00481 (dvoid *) &value,
00482 (ub4 ) 0,
00483 (ub4 ) OCI_ATTR_DEQ_MSGID,
00484 dequeue->typinf->con->err)
00485 )
00486
00487 OCI_RESULT(res);
00488
00489 return res;
00490 }
00491
00492
00493
00494
00495
00496 unsigned int OCI_API OCI_DequeueGetVisibility
00497 (
00498 OCI_Dequeue *dequeue
00499 )
00500 {
00501 boolean res = TRUE;
00502 ub4 ret = 0;
00503
00504 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, 0);
00505
00506 OCI_CALL2
00507 (
00508 res, dequeue->typinf->con,
00509
00510 OCIAttrGet((dvoid *) dequeue->opth,
00511 (ub4 ) OCI_DTYPE_AQDEQ_OPTIONS,
00512 (dvoid *) &ret,
00513 (ub4 *) NULL,
00514 (ub4 ) OCI_ATTR_VISIBILITY,
00515 dequeue->typinf->con->err)
00516 )
00517
00518 OCI_RESULT(res);
00519
00520 return (int) ret;
00521 }
00522
00523
00524
00525
00526
00527 boolean OCI_API OCI_DequeueSetVisibility
00528 (
00529 OCI_Dequeue *dequeue,
00530 unsigned int visibility
00531 )
00532 {
00533 boolean res = TRUE;
00534 ub4 value = (ub4) visibility;
00535
00536 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, FALSE);
00537
00538 OCI_CALL2
00539 (
00540 res, dequeue->typinf->con,
00541
00542 OCIAttrSet((dvoid *) dequeue->opth,
00543 (ub4 ) OCI_DTYPE_AQDEQ_OPTIONS,
00544 (dvoid *) &value,
00545 (ub4 ) 0,
00546 (ub4 ) OCI_ATTR_VISIBILITY,
00547 dequeue->typinf->con->err)
00548 )
00549
00550 OCI_RESULT(res);
00551
00552 return res;
00553 }
00554
00555
00556
00557
00558
00559 unsigned int OCI_API OCI_DequeueGetMode
00560 (
00561 OCI_Dequeue *dequeue
00562 )
00563 {
00564 boolean res = TRUE;
00565 ub4 ret = 0;
00566
00567 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, 0);
00568
00569 OCI_CALL2
00570 (
00571 res, dequeue->typinf->con,
00572
00573 OCIAttrGet((dvoid *) dequeue->opth,
00574 (ub4 ) OCI_DTYPE_AQDEQ_OPTIONS,
00575 (dvoid *) &ret,
00576 (ub4 *) NULL,
00577 (ub4 ) OCI_ATTR_DEQ_MODE,
00578 dequeue->typinf->con->err)
00579 )
00580
00581 OCI_RESULT(res);
00582
00583 return (int) ret;
00584 }
00585
00586
00587
00588
00589
00590 boolean OCI_API OCI_DequeueSetMode
00591 (
00592 OCI_Dequeue *dequeue,
00593 unsigned int mode
00594 )
00595 {
00596 boolean res = TRUE;
00597 ub4 value = (ub4) mode;
00598
00599 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, FALSE);
00600
00601 OCI_CALL2
00602 (
00603 res, dequeue->typinf->con,
00604
00605 OCIAttrSet((dvoid *) dequeue->opth,
00606 (ub4 ) OCI_DTYPE_AQDEQ_OPTIONS,
00607 (dvoid *) &value,
00608 (ub4 ) 0,
00609 (ub4 ) OCI_ATTR_DEQ_MODE,
00610 dequeue->typinf->con->err)
00611 )
00612
00613 OCI_RESULT(res);
00614
00615 return res;
00616 }
00617
00618
00619
00620
00621
00622 unsigned int OCI_API OCI_DequeueGetNavigation
00623 (
00624 OCI_Dequeue *dequeue
00625 )
00626 {
00627 boolean res = TRUE;
00628 ub4 ret = 0;
00629
00630 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, 0);
00631
00632 OCI_CALL2
00633 (
00634 res, dequeue->typinf->con,
00635
00636 OCIAttrGet((dvoid *) dequeue->opth,
00637 (ub4 ) OCI_DTYPE_AQDEQ_OPTIONS,
00638 (dvoid *) &ret,
00639 (ub4 *) NULL,
00640 (ub4 ) OCI_ATTR_NAVIGATION,
00641 dequeue->typinf->con->err)
00642 )
00643
00644 OCI_RESULT(res);
00645
00646 return (int) ret;
00647 }
00648
00649
00650
00651
00652
00653 boolean OCI_API OCI_DequeueSetNavigation
00654 (
00655 OCI_Dequeue *dequeue,
00656 unsigned int position
00657 )
00658 {
00659 boolean res = TRUE;
00660 ub4 value = (ub4) position;
00661
00662 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, FALSE);
00663
00664 OCI_CALL2
00665 (
00666 res, dequeue->typinf->con,
00667
00668 OCIAttrSet((dvoid *) dequeue->opth,
00669 (ub4 ) OCI_DTYPE_AQDEQ_OPTIONS,
00670 (dvoid *) &value,
00671 (ub4 ) 0,
00672 (ub4 ) OCI_ATTR_NAVIGATION,
00673 dequeue->typinf->con->err)
00674 )
00675
00676 OCI_RESULT(res);
00677
00678 return res;
00679 }
00680
00681
00682
00683
00684
00685 int OCI_API OCI_DequeueGetWaitTime
00686 (
00687 OCI_Dequeue *dequeue
00688 )
00689 {
00690 boolean res = TRUE;
00691 sb4 ret = 0;
00692
00693 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, 0);
00694
00695 OCI_CALL2
00696 (
00697 res, dequeue->typinf->con,
00698
00699 OCIAttrGet((dvoid *) dequeue->opth,
00700 (ub4 ) OCI_DTYPE_AQDEQ_OPTIONS,
00701 (dvoid *) &ret,
00702 (ub4 *) NULL,
00703 (ub4 ) OCI_ATTR_WAIT,
00704 dequeue->typinf->con->err)
00705 )
00706
00707 OCI_RESULT(res);
00708
00709 return (int) ret;
00710 }
00711
00712
00713
00714
00715
00716 boolean OCI_API OCI_DequeueSetWaitTime
00717 (
00718 OCI_Dequeue *dequeue,
00719 int timeout
00720 )
00721 {
00722 boolean res = TRUE;
00723 sb4 value = (ub4) timeout;
00724
00725 OCI_CHECK_PTR(OCI_IPC_DEQUEUE, dequeue, FALSE);
00726
00727 OCI_CALL2
00728 (
00729 res, dequeue->typinf->con,
00730
00731 OCIAttrSet((dvoid *) dequeue->opth,
00732 (ub4 ) OCI_DTYPE_AQDEQ_OPTIONS,
00733 (dvoid *) &value,
00734 (ub4 ) 0,
00735 (ub4 ) OCI_ATTR_WAIT,
00736 dequeue->typinf->con->err)
00737 )
00738
00739 OCI_RESULT(res);
00740
00741 return res;
00742 }
00743
00744
00745
00746
00747
00748 boolean OCI_API OCI_DequeueSetAgentList
00749 (
00750 OCI_Dequeue *dequeue,
00751 OCI_Agent **consumers,
00752 unsigned int count
00753 )
00754 {
00755 boolean res = TRUE;
00756
00757 OCI_CHECK_PTR(OCI_IPC_ENQUEUE, dequeue, FALSE);
00758
00759 OCI_FREE(dequeue->agent_list);
00760
00761 if ((consumers != NULL) && (count > 0))
00762 {
00763 dequeue->agent_list = (OCIAQAgent **) OCI_MemAlloc(OCI_IPC_ARRAY,
00764 sizeof(OCIAQAgent *),
00765 count, FALSE);
00766
00767 if (dequeue->agent_list != NULL)
00768 {
00769 unsigned int i;
00770
00771 for(i = 0; i < count; i++)
00772 {
00773 dequeue->agent_list[i] = consumers[i]->handle;
00774 }
00775
00776 dequeue->agent_count = (ub4) count;
00777 }
00778 else
00779 {
00780 res = FALSE;
00781 }
00782 }
00783
00784 OCI_RESULT(res);
00785
00786 return res;
00787 }